Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moves more MG graph ETL to libcugraph and re-enables MG tests in CI #3941

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b59c94f
placeholder for re-enabling the mg testing
jnke2016 Oct 17, 2023
d07af7d
propose C API changes
ChuckHastings Nov 7, 2023
c66a50e
new graph creation implementation in C API
ChuckHastings Nov 13, 2023
37278d1
Merge branch 'branch-23.12' into new_graph_creation_methods
ChuckHastings Nov 13, 2023
d470bc6
add proper test for multiple input lists in MG
ChuckHastings Nov 14, 2023
0dbea1a
Merge branch 'branch-23.12' into new_graph_creation_methods
ChuckHastings Nov 14, 2023
67e7383
update branch with the latest changes
jnke2016 Nov 15, 2023
a56b30a
fetch and merge CAPI graph update
jnke2016 Nov 15, 2023
d0bf54c
support isolated vertices for sg graph
jnke2016 Nov 15, 2023
52b3162
add support for dropping self loops and removing multi edges to C API…
ChuckHastings Nov 16, 2023
0cca2cd
Merge branch 'branch-23.12' into new_graph_creation_methods
ChuckHastings Nov 16, 2023
d96ba62
refactor remove_self_loops and sort_and_remove_multi_edges to reduce …
ChuckHastings Nov 17, 2023
f1ab784
add support for isolated vertices, list of edges
jnke2016 Nov 17, 2023
9242207
Merge remote-tracking branch 'upstream/new_graph_creation_methods' in…
jnke2016 Nov 17, 2023
d1b104a
check weights before extracting its type
jnke2016 Nov 17, 2023
dfafeec
remove deprecated parameter 'num_edges'
jnke2016 Nov 17, 2023
e7c0cf9
support list of edges
jnke2016 Nov 17, 2023
a44a1ae
remove debug print
jnke2016 Nov 17, 2023
12bf121
fix style
jnke2016 Nov 17, 2023
0fc84bd
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
jnke2016 Nov 17, 2023
76e5c15
pass list of edgelist to the plc graph creation
jnke2016 Nov 18, 2023
172ea07
update check
jnke2016 Nov 20, 2023
42913b8
update data persistence
jnke2016 Nov 20, 2023
3e233f1
cleanup code and fix bugs
jnke2016 Nov 20, 2023
a612a0a
fix style
jnke2016 Nov 20, 2023
4513d19
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
jnke2016 Nov 20, 2023
cc113db
pass keyword argument to accommodate for the plc graph creation signa…
jnke2016 Nov 20, 2023
0ceeb4f
update doctests
jnke2016 Nov 20, 2023
e757ff9
update doctest examples
jnke2016 Nov 20, 2023
68e76b8
re-enable single gpu dask python tests
jnke2016 Nov 20, 2023
64bb371
fix style
jnke2016 Nov 20, 2023
9915497
update copyright
jnke2016 Nov 20, 2023
08c0d05
update copyright
jnke2016 Nov 20, 2023
6db1050
lower tolerance
jnke2016 Nov 20, 2023
39bded6
fix docstring examples
jnke2016 Nov 20, 2023
129a226
Merge branch 'branch-23.12' into branch-23.12_re-enable-mg-testing
naimnv Nov 20, 2023
e275714
Remove another persist and decrease memory footprint of drop_duplicates
VibhuJawa Nov 21, 2023
6069f3c
decrease memory footprint of drop_duplicates
VibhuJawa Nov 21, 2023
64ec881
decrease memory footprint of drop_duplicates
VibhuJawa Nov 21, 2023
a32fd3d
Revert bulk sampling changes
VibhuJawa Nov 21, 2023
5f7d4e5
Revert bulk sampling changes
VibhuJawa Nov 21, 2023
008e1d0
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
rlratzel Nov 22, 2023
8b3b051
properly handle smaller graphs
jnke2016 Nov 22, 2023
51b5a90
remove extra persist
jnke2016 Nov 22, 2023
c8c0abe
Merge remote-tracking branch 'upstream/branch-23.12_re-enable-mg-test…
jnke2016 Nov 22, 2023
3b3d0c6
fix style
jnke2016 Nov 22, 2023
2d4e5ac
undo changes when resolving merge conflict
jnke2016 Nov 22, 2023
e4db01d
clean up code
jnke2016 Nov 22, 2023
d29c10d
update docstrings
jnke2016 Nov 22, 2023
3a4255d
update docstrings
jnke2016 Nov 22, 2023
c607a58
properly handle list of device arrays and clean up code
jnke2016 Nov 22, 2023
7f869b3
explicitly increase the timeout per worker
jnke2016 Nov 22, 2023
71fb5e5
temporarily lower the timeout value
jnke2016 Nov 22, 2023
9f8b131
fix style and add comment
jnke2016 Nov 22, 2023
912701a
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
jnke2016 Nov 22, 2023
35a6feb
fix style
jnke2016 Nov 22, 2023
fdeaa57
refactor distribution of dask objects across workers
jnke2016 Nov 27, 2023
07bcd2e
fix style
jnke2016 Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pytest \
--cov=cugraph \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/cugraph-coverage.xml" \
--cov-report=term \
-k "not _mg" \
-k "not test_property_graph_mg" \
tests
popd

Expand Down
2 changes: 1 addition & 1 deletion ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ else
DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT="1000s" \
python -m pytest -k "not _mg" ./python/${package_name}/${python_package_name}/tests
python -m pytest ./python/${package_name}/${python_package_name}/tests
fi
3 changes: 2 additions & 1 deletion cpp/include/cugraph_c/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ cugraph_error_code_t cugraph_sg_graph_create(
* Note that setting this flag will arbitrarily select one instance of a multi edge to be the
* edge that survives. If the edges have properties that should be honored (e.g. sum the
weights,
* or take the maximum weight), the caller should do that on not rely on this flag.
* or take the maximum weight), the caller should remove specific edges themselves and not rely
* on this flag.
* @param [in] do_expensive_check If true, do expensive checks to validate the input data
* is consistent with software assumptions. If false bypass these checks.
* @param [out] graph A pointer to the graph object
Expand Down
14 changes: 6 additions & 8 deletions python/cugraph/cugraph/dask/community/egonet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import cugraph.dask.comms.comms as Comms
import dask_cudf
import cudf
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)

from pylibcugraph import ResourceHandle, ego_graph as pylibcugraph_ego_graph

Expand Down Expand Up @@ -135,25 +137,21 @@ def ego_graph(input_graph, n, radius=1, center=True):
n = dask_cudf.from_cudf(n, npartitions=min(input_graph._npartitions, len(n)))
n = n.astype(n_type)

n = get_distributed_data(n)
wait(n)

n = n.worker_to_parts

n = persist_dask_df_equal_parts_per_worker(n, client, return_type="dict")
do_expensive_check = False

result = [
client.submit(
_call_ego_graph,
Comms.get_session_id(),
input_graph._plc_graph[w],
n[w][0],
n_[0] if n_ else cudf.Series(dtype=n_type),
radius,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
for w, n_ in n.items()
]
wait(result)

Expand Down
19 changes: 9 additions & 10 deletions python/cugraph/cugraph/dask/community/induced_subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import dask_cudf
import cudf
import cupy as cp
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
from typing import Union, Tuple

from pylibcugraph import (
Expand Down Expand Up @@ -154,15 +156,12 @@ def induced_subgraph(
vertices_type = input_graph.input_df.dtypes[0]

if isinstance(vertices, (cudf.Series, cudf.DataFrame)):
vertices = dask_cudf.from_cudf(
vertices, npartitions=min(input_graph._npartitions, len(vertices))
)
vertices = dask_cudf.from_cudf(vertices, npartitions=input_graph._npartitions)
vertices = vertices.astype(vertices_type)

vertices = get_distributed_data(vertices)
wait(vertices)

vertices = vertices.worker_to_parts
vertices = persist_dask_df_equal_parts_per_worker(
vertices, client, return_type="dict"
)

do_expensive_check = False

Expand All @@ -171,13 +170,13 @@ def induced_subgraph(
_call_induced_subgraph,
Comms.get_session_id(),
input_graph._plc_graph[w],
vertices[w][0],
vertices_[0] if vertices_ else cudf.Series(dtype=vertices_type),
offsets,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
for w, vertices_ in vertices.items()
]
wait(result)

Expand Down
16 changes: 13 additions & 3 deletions python/cugraph/cugraph/dask/community/leiden.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,19 @@ def leiden(

Examples
--------
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts, modularity_score = cugraph.leiden(G)
>>> import cugraph.dask as dcg
>>> import dask_cudf
>>> # ... Init a DASK Cluster
>>> # see https://docs.rapids.ai/api/cugraph/stable/dask-cugraph.html
>>> # Download dataset from https://github.com/rapidsai/cugraph/datasets/..
>>> chunksize = dcg.get_chunksize(datasets_path / "karate.csv")
>>> ddf = dask_cudf.read_csv(datasets_path / "karate.csv",
... chunksize=chunksize, delimiter=" ",
... names=["src", "dst", "value"],
... dtype=["int32", "int32", "float32"])
>>> dg = cugraph.Graph()
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
>>> parts, modularity_score = dcg.leiden(dg)

"""

Expand Down
16 changes: 13 additions & 3 deletions python/cugraph/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,19 @@ def louvain(

Examples
--------
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts = cugraph.louvain(G)
>>> import cugraph.dask as dcg
>>> import dask_cudf
>>> # ... Init a DASK Cluster
>>> # see https://docs.rapids.ai/api/cugraph/stable/dask-cugraph.html
>>> # Download dataset from https://github.com/rapidsai/cugraph/datasets/..
>>> chunksize = dcg.get_chunksize(datasets_path / "karate.csv")
>>> ddf = dask_cudf.read_csv(datasets_path / "karate.csv",
... chunksize=chunksize, delimiter=" ",
... names=["src", "dst", "value"],
... dtype=["int32", "int32", "float32"])
>>> dg = cugraph.Graph()
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
>>> parts, modularity_score = dcg.louvain(dg)

"""

Expand Down
17 changes: 13 additions & 4 deletions python/cugraph/cugraph/dask/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
)

import cugraph.dask.comms.comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
from cugraph.exceptions import FailedToConvergeError


Expand Down Expand Up @@ -352,7 +354,14 @@ def pagerank(
personalization, npartitions=len(Comms.get_workers())
)

data_prsztn = get_distributed_data(personalization_ddf)
data_prsztn = persist_dask_df_equal_parts_per_worker(
personalization_ddf, client, return_type="dict"
)

empty_df = cudf.DataFrame(columns=list(personalization_ddf.columns))
empty_df = empty_df.astype(
dict(zip(personalization_ddf.columns, personalization_ddf.dtypes))
)

result = [
client.submit(
Expand All @@ -361,7 +370,7 @@ def pagerank(
input_graph._plc_graph[w],
precomputed_vertex_out_weight_vertices,
precomputed_vertex_out_weight_sums,
data_personalization[0],
data_personalization[0] if data_personalization else empty_df,
initial_guess_vertices,
initial_guess_values,
alpha,
Expand All @@ -372,7 +381,7 @@ def pagerank(
workers=[w],
allow_other_workers=False,
)
for w, data_personalization in data_prsztn.worker_to_parts.items()
for w, data_personalization in data_prsztn.items()
]
else:
result = [
Expand Down
18 changes: 9 additions & 9 deletions python/cugraph/cugraph/dask/sampling/random_walks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import dask_cudf
import cudf
import operator as op
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)

from pylibcugraph import ResourceHandle

Expand All @@ -24,7 +27,6 @@
)

from cugraph.dask.comms import comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data


def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
Expand Down Expand Up @@ -104,7 +106,7 @@ def random_walks(
max_path_length : int
The maximum path length
"""

client = default_client()
if isinstance(start_vertices, int):
start_vertices = [start_vertices]

Expand All @@ -126,23 +128,21 @@ def random_walks(
start_vertices, npartitions=min(input_graph._npartitions, len(start_vertices))
)
start_vertices = start_vertices.astype(start_vertices_type)
start_vertices = get_distributed_data(start_vertices)
wait(start_vertices)
start_vertices = start_vertices.worker_to_parts

client = default_client()
start_vertices = persist_dask_df_equal_parts_per_worker(
start_vertices, client, return_type="dict"
)

result = [
client.submit(
_call_plc_uniform_random_walks,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_vertices[w][0],
start_v[0] if start_v else cudf.Series(dtype=start_vertices_type),
max_depth,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
for w, start_v in start_vertices.items()
]

wait(result)
Expand Down
15 changes: 11 additions & 4 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from pylibcugraph import ResourceHandle, bfs as pylibcugraph_bfs

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
import cugraph.dask.comms.comms as Comms
import cudf
import dask_cudf
Expand Down Expand Up @@ -159,8 +161,13 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
tmp_col_names = None

start = input_graph.lookup_internal_vertex_id(start, tmp_col_names)
vertex_dtype = start.dtype # if the edgelist was renumbered, update
# the vertex type accordingly

data_start = persist_dask_df_equal_parts_per_worker(
start, client, return_type="dict"
)

data_start = get_distributed_data(start)
do_expensive_check = False
# FIXME: Why is 'direction_optimizing' not part of the python cugraph API
# and why is it set to 'False' by default
Expand All @@ -171,15 +178,15 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
_call_plc_bfs,
Comms.get_session_id(),
input_graph._plc_graph[w],
st[0],
st[0] if st else cudf.Series(dtype=vertex_dtype),
depth_limit,
direction_optimizing,
return_distances,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w, st in data_start.worker_to_parts.items()
for w, st in data_start.items()
]

wait(cupy_result)
Expand Down
Loading