Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-23.12' into branch-24.0…
Browse files Browse the repository at this point in the history
…2-merge-23.12
  • Loading branch information
rlratzel committed Nov 27, 2023
2 parents 0a29752 + 3116eed commit 5047ffd
Show file tree
Hide file tree
Showing 49 changed files with 521 additions and 298 deletions.
2 changes: 1 addition & 1 deletion ci/test_python.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pytest \
--cov=cugraph \
--cov-report=xml:"${RAPIDS_COVERAGE_DIR}/cugraph-coverage.xml" \
--cov-report=term \
-k "not _mg" \
-k "not test_property_graph_mg" \
tests
popd

Expand Down
2 changes: 1 addition & 1 deletion ci/test_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ else
DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \
DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \
DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT="1000s" \
python -m pytest -k "not _mg" ./python/${package_name}/${python_package_name}/tests
python -m pytest ./python/${package_name}/${python_package_name}/tests
fi
3 changes: 2 additions & 1 deletion cpp/include/cugraph_c/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ cugraph_error_code_t cugraph_sg_graph_create(
* Note that setting this flag will arbitrarily select one instance of a multi edge to be the
* edge that survives. If the edges have properties that should be honored (e.g. sum the
weights,
* or take the maximum weight), the caller should do that on not rely on this flag.
* or take the maximum weight), the caller should remove specific edges themselves and not rely
* on this flag.
* @param [in] do_expensive_check If true, do expensive checks to validate the input data
* is consistent with software assumptions. If false bypass these checks.
* @param [out] graph A pointer to the graph object
Expand Down
14 changes: 6 additions & 8 deletions python/cugraph/cugraph/dask/community/egonet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import cugraph.dask.comms.comms as Comms
import dask_cudf
import cudf
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)

from pylibcugraph import ResourceHandle, ego_graph as pylibcugraph_ego_graph

Expand Down Expand Up @@ -135,25 +137,21 @@ def ego_graph(input_graph, n, radius=1, center=True):
n = dask_cudf.from_cudf(n, npartitions=min(input_graph._npartitions, len(n)))
n = n.astype(n_type)

n = get_distributed_data(n)
wait(n)

n = n.worker_to_parts

n = persist_dask_df_equal_parts_per_worker(n, client, return_type="dict")
do_expensive_check = False

result = [
client.submit(
_call_ego_graph,
Comms.get_session_id(),
input_graph._plc_graph[w],
n[w][0],
n_[0] if n_ else cudf.Series(dtype=n_type),
radius,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
for w, n_ in n.items()
]
wait(result)

Expand Down
19 changes: 9 additions & 10 deletions python/cugraph/cugraph/dask/community/induced_subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import dask_cudf
import cudf
import cupy as cp
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
from typing import Union, Tuple

from pylibcugraph import (
Expand Down Expand Up @@ -154,15 +156,12 @@ def induced_subgraph(
vertices_type = input_graph.input_df.dtypes[0]

if isinstance(vertices, (cudf.Series, cudf.DataFrame)):
vertices = dask_cudf.from_cudf(
vertices, npartitions=min(input_graph._npartitions, len(vertices))
)
vertices = dask_cudf.from_cudf(vertices, npartitions=input_graph._npartitions)
vertices = vertices.astype(vertices_type)

vertices = get_distributed_data(vertices)
wait(vertices)

vertices = vertices.worker_to_parts
vertices = persist_dask_df_equal_parts_per_worker(
vertices, client, return_type="dict"
)

do_expensive_check = False

Expand All @@ -171,13 +170,13 @@ def induced_subgraph(
_call_induced_subgraph,
Comms.get_session_id(),
input_graph._plc_graph[w],
vertices[w][0],
vertices_[0] if vertices_ else cudf.Series(dtype=vertices_type),
offsets,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
for w, vertices_ in vertices.items()
]
wait(result)

Expand Down
16 changes: 13 additions & 3 deletions python/cugraph/cugraph/dask/community/leiden.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,19 @@ def leiden(
Examples
--------
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts, modularity_score = cugraph.leiden(G)
>>> import cugraph.dask as dcg
>>> import dask_cudf
>>> # ... Init a DASK Cluster
>>> # see https://docs.rapids.ai/api/cugraph/stable/dask-cugraph.html
>>> # Download dataset from https://github.com/rapidsai/cugraph/datasets/..
>>> chunksize = dcg.get_chunksize(datasets_path / "karate.csv")
>>> ddf = dask_cudf.read_csv(datasets_path / "karate.csv",
... chunksize=chunksize, delimiter=" ",
... names=["src", "dst", "value"],
... dtype=["int32", "int32", "float32"])
>>> dg = cugraph.Graph()
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
>>> parts, modularity_score = dcg.leiden(dg)
"""

Expand Down
16 changes: 13 additions & 3 deletions python/cugraph/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,19 @@ def louvain(
Examples
--------
>>> from cugraph.datasets import karate
>>> G = karate.get_graph(fetch=True)
>>> parts = cugraph.louvain(G)
>>> import cugraph.dask as dcg
>>> import dask_cudf
>>> # ... Init a DASK Cluster
>>> # see https://docs.rapids.ai/api/cugraph/stable/dask-cugraph.html
>>> # Download dataset from https://github.com/rapidsai/cugraph/datasets/..
>>> chunksize = dcg.get_chunksize(datasets_path / "karate.csv")
>>> ddf = dask_cudf.read_csv(datasets_path / "karate.csv",
... chunksize=chunksize, delimiter=" ",
... names=["src", "dst", "value"],
... dtype=["int32", "int32", "float32"])
>>> dg = cugraph.Graph()
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst')
>>> parts, modularity_score = dcg.louvain(dg)
"""

Expand Down
17 changes: 13 additions & 4 deletions python/cugraph/cugraph/dask/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
)

import cugraph.dask.comms.comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
from cugraph.exceptions import FailedToConvergeError


Expand Down Expand Up @@ -352,7 +354,14 @@ def pagerank(
personalization, npartitions=len(Comms.get_workers())
)

data_prsztn = get_distributed_data(personalization_ddf)
data_prsztn = persist_dask_df_equal_parts_per_worker(
personalization_ddf, client, return_type="dict"
)

empty_df = cudf.DataFrame(columns=list(personalization_ddf.columns))
empty_df = empty_df.astype(
dict(zip(personalization_ddf.columns, personalization_ddf.dtypes))
)

result = [
client.submit(
Expand All @@ -361,7 +370,7 @@ def pagerank(
input_graph._plc_graph[w],
precomputed_vertex_out_weight_vertices,
precomputed_vertex_out_weight_sums,
data_personalization[0],
data_personalization[0] if data_personalization else empty_df,
initial_guess_vertices,
initial_guess_values,
alpha,
Expand All @@ -372,7 +381,7 @@ def pagerank(
workers=[w],
allow_other_workers=False,
)
for w, data_personalization in data_prsztn.worker_to_parts.items()
for w, data_personalization in data_prsztn.items()
]
else:
result = [
Expand Down
18 changes: 9 additions & 9 deletions python/cugraph/cugraph/dask/sampling/random_walks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import dask_cudf
import cudf
import operator as op
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)

from pylibcugraph import ResourceHandle

Expand All @@ -24,7 +27,6 @@
)

from cugraph.dask.comms import comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data


def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
Expand Down Expand Up @@ -104,7 +106,7 @@ def random_walks(
max_path_length : int
The maximum path length
"""

client = default_client()
if isinstance(start_vertices, int):
start_vertices = [start_vertices]

Expand All @@ -126,23 +128,21 @@ def random_walks(
start_vertices, npartitions=min(input_graph._npartitions, len(start_vertices))
)
start_vertices = start_vertices.astype(start_vertices_type)
start_vertices = get_distributed_data(start_vertices)
wait(start_vertices)
start_vertices = start_vertices.worker_to_parts

client = default_client()
start_vertices = persist_dask_df_equal_parts_per_worker(
start_vertices, client, return_type="dict"
)

result = [
client.submit(
_call_plc_uniform_random_walks,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_vertices[w][0],
start_v[0] if start_v else cudf.Series(dtype=start_vertices_type),
max_depth,
workers=[w],
allow_other_workers=False,
)
for w in Comms.get_workers()
for w, start_v in start_vertices.items()
]

wait(result)
Expand Down
15 changes: 11 additions & 4 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from pylibcugraph import ResourceHandle, bfs as pylibcugraph_bfs

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)
import cugraph.dask.comms.comms as Comms
import cudf
import dask_cudf
Expand Down Expand Up @@ -159,8 +161,13 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
tmp_col_names = None

start = input_graph.lookup_internal_vertex_id(start, tmp_col_names)
vertex_dtype = start.dtype # if the edgelist was renumbered, update
# the vertex type accordingly

data_start = persist_dask_df_equal_parts_per_worker(
start, client, return_type="dict"
)

data_start = get_distributed_data(start)
do_expensive_check = False
# FIXME: Why is 'direction_optimizing' not part of the python cugraph API
# and why is it set to 'False' by default
Expand All @@ -171,15 +178,15 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
_call_plc_bfs,
Comms.get_session_id(),
input_graph._plc_graph[w],
st[0],
st[0] if st else cudf.Series(dtype=vertex_dtype),
depth_limit,
direction_optimizing,
return_distances,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for w, st in data_start.worker_to_parts.items()
for w, st in data_start.items()
]

wait(cupy_result)
Expand Down
Loading

0 comments on commit 5047ffd

Please sign in to comment.