Skip to content

Commit

Permalink
Optimize the drop-duplicate functionality (#4095)
Browse files Browse the repository at this point in the history
Our current python API leverages dask to implement the `drop-duplicate` functionality but it carries a lot of overhead as it draws a significant amount of host memory and results into a crash when processing large graphs (4+ billion edges).

This PR
1. Leverages the CAPI to internally drop multi edges when creating the PLC graph.
2. Deprecates the parameter `multi` which, when set to False, triggers the dask based `drop-duplicate` functionality
3. Add flag `do_expensive_check` to check for `NULL` values in the edgelist

Authors:
  - Joseph Nke (https://github.com/jnke2016)

Approvers:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #4095
  • Loading branch information
jnke2016 authored Feb 2, 2024
1 parent 20f7dca commit 3d52f17
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from cugraph.dask.common.mg_utils import run_gc_on_dask_cluster
import cugraph.dask.comms.comms as Comms
from cugraph.structure.symmetrize import _memory_efficient_drop_duplicates


class simpleDistributedGraphImpl:
Expand Down Expand Up @@ -95,6 +96,7 @@ def _make_plc_graph(
weight_type,
edge_id_type,
edge_type_id,
drop_multi_edges,
):
weights = None
edge_ids = None
Expand Down Expand Up @@ -149,6 +151,7 @@ def _make_plc_graph(
num_arrays=num_arrays,
store_transposed=store_transposed,
do_expensive_check=False,
drop_multi_edges=drop_multi_edges,
)
del edata_x
gc.collect()
Expand Down Expand Up @@ -267,7 +270,7 @@ def __from_edgelist(
input_ddf,
source,
destination,
multi=self.properties.multi_edge,
multi=True, # Deprecated parameter
symmetrize=not self.properties.directed,
)
value_col = None
Expand All @@ -277,7 +280,7 @@ def __from_edgelist(
source,
destination,
value_col_names,
multi=self.properties.multi_edge,
multi=True, # Deprecated parameter
symmetrize=not self.properties.directed,
)

Expand Down Expand Up @@ -364,6 +367,7 @@ def __from_edgelist(
self.weight_type,
self.edge_id_type,
self.edge_type_id_type,
not self.properties.multi_edge,
)
for w, edata in persisted_keys_d.items()
}
Expand Down Expand Up @@ -455,6 +459,15 @@ def view_edge_list(self):
else:
is_multi_column = True

if not self.properties.multi_edge:
# Drop parallel edges for non MultiGraph
# FIXME: Drop multi edges with the CAPI instead.
_client = default_client()
workers = _client.scheduler_info()["workers"]
edgelist_df = _memory_efficient_drop_duplicates(
edgelist_df, [srcCol, dstCol], len(workers)
)

edgelist_df[srcCol], edgelist_df[dstCol] = edgelist_df[
[srcCol, dstCol]
].min(axis=1), edgelist_df[[srcCol, dstCol]].max(axis=1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -264,7 +264,7 @@ def __from_edgelist(
source,
destination,
edge_attr,
multi=self.properties.multi_edge,
multi=self.properties.multi_edge, # Deprecated parameter
symmetrize=not self.properties.directed,
)

Expand All @@ -279,7 +279,7 @@ def __from_edgelist(
elist,
source,
destination,
multi=self.properties.multi_edge,
multi=self.properties.multi_edge, # Deprecated parameter
symmetrize=not self.properties.directed,
)

Expand All @@ -298,7 +298,10 @@ def __from_edgelist(
self._replicate_edgelist()

self._make_plc_graph(
value_col=value_col, store_transposed=store_transposed, renumber=renumber
value_col=value_col,
store_transposed=store_transposed,
renumber=renumber,
drop_multi_edges=not self.properties.multi_edge,
)

def to_pandas_edgelist(
Expand Down Expand Up @@ -477,13 +480,15 @@ def view_edge_list(self):
edgelist_df[simpleGraphImpl.srcCol]
<= edgelist_df[simpleGraphImpl.dstCol]
]

elif not use_initial_input_df and self.properties.renumbered:
# Do not unrenumber the vertices if the initial input df was used
if not self.properties.directed:
edgelist_df = edgelist_df[
edgelist_df[simpleGraphImpl.srcCol]
<= edgelist_df[simpleGraphImpl.dstCol]
]

edgelist_df = self.renumber_map.unrenumber(
edgelist_df, simpleGraphImpl.srcCol
)
Expand Down Expand Up @@ -1084,6 +1089,7 @@ def _make_plc_graph(
value_col: Dict[str, cudf.DataFrame] = None,
store_transposed: bool = False,
renumber: bool = True,
drop_multi_edges: bool = False,
):
"""
Parameters
Expand All @@ -1100,6 +1106,8 @@ def _make_plc_graph(
Whether to renumber the vertices of the graph.
Required if inputted vertex ids are not of
int32 or int64 type.
drop_multi_edges: bool (default=False)
Whether to drop multi edges
"""

if value_col is None:
Expand Down Expand Up @@ -1163,6 +1171,7 @@ def _make_plc_graph(
renumber=renumber,
do_expensive_check=True,
input_array_format=input_array_format,
drop_multi_edges=drop_multi_edges,
)

def to_directed(self, DiG, store_transposed=False):
Expand Down
37 changes: 34 additions & 3 deletions python/cugraph/cugraph/structure/symmetrize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -15,6 +15,7 @@
import cudf
import dask_cudf
from dask.distributed import default_client
import warnings


def symmetrize_df(
Expand Down Expand Up @@ -54,6 +55,11 @@ def symmetrize_df(
Name of the column in the data frame containing the weight ids
multi : bool, optional (default=False)
[Deprecated, Multi will be removed in future version, and the removal
of multi edges will no longer be supported from 'symmetrize'.
Multi edges will be removed upon creation of graph instance directly
based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.]
Set to True if graph is a Multi(Di)Graph. This allows multiple
edges instead of dropping them.
Expand Down Expand Up @@ -84,6 +90,12 @@ def symmetrize_df(
if multi:
return result
else:
warnings.warn(
"Multi is deprecated and the removal of multi edges will no longer be "
"supported from 'symmetrize'. Multi edges will be removed upon creation "
"of graph instance.",
FutureWarning,
)
vertex_col_name = src_name + dst_name
result = result.groupby(by=[*vertex_col_name], as_index=False).min()
return result
Expand Down Expand Up @@ -128,6 +140,11 @@ def symmetrize_ddf(
Name of the column in the data frame containing the weight ids
multi : bool, optional (default=False)
[Deprecated, Multi will be removed in future version, and the removal
of multi edges will no longer be supported from 'symmetrize'.
Multi edges will be removed upon creation of graph instance directly
based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.]
Set to True if graph is a Multi(Di)Graph. This allows multiple
edges instead of dropping them.
Expand Down Expand Up @@ -165,8 +182,15 @@ def symmetrize_ddf(
else:
result = ddf
if multi:
result = result.reset_index(drop=True).repartition(npartitions=len(workers) * 2)
return result
else:
warnings.warn(
"Multi is deprecated and the removal of multi edges will no longer be "
"supported from 'symmetrize'. Multi edges will be removed upon creation "
"of graph instance.",
FutureWarning,
)
vertex_col_name = src_name + dst_name
result = _memory_efficient_drop_duplicates(
result, vertex_col_name, len(workers)
Expand All @@ -181,6 +205,7 @@ def symmetrize(
value_col_name=None,
multi=False,
symmetrize=True,
do_expensive_check=False,
):
"""
Take a dataframe of source destination pairs along with associated
Expand Down Expand Up @@ -208,6 +233,11 @@ def symmetrize(
weights column name.
multi : bool, optional (default=False)
[Deprecated, Multi will be removed in future version, and the removal
of multi edges will no longer be supported from 'symmetrize'.
Multi edges will be removed upon creation of graph instance directly
based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.]
Set to True if graph is a Multi(Di)Graph. This allows multiple
edges instead of dropping them.
Expand All @@ -234,8 +264,9 @@ def symmetrize(
if "edge_id" in input_df.columns and symmetrize:
raise ValueError("Edge IDs are not supported on undirected graphs")

csg.null_check(input_df[source_col_name])
csg.null_check(input_df[dest_col_name])
if do_expensive_check: # FIXME: Optimize this check as it is currently expensive
csg.null_check(input_df[source_col_name])
csg.null_check(input_df[dest_col_name])

if isinstance(input_df, dask_cudf.DataFrame):
output_df = symmetrize_ddf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -26,6 +26,7 @@
from cugraph.testing import UNDIRECTED_DATASETS
from cugraph.dask import uniform_neighbor_sample
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.structure.symmetrize import _memory_efficient_drop_duplicates
from cugraph.datasets import email_Eu_core, small_tree
from pylibcugraph.testing.utils import gen_fixture_params_product

Expand Down Expand Up @@ -135,6 +136,14 @@ def test_mg_uniform_neighbor_sample_simple(dask_client, input_combo):
dg = input_combo["MGGraph"]

input_df = dg.input_df
# Drop parallel edges for non MultiGraph
# FIXME: Drop multi edges with the CAPI instead.
vertex_col_name = ["src", "dst"]
workers = dask_client.scheduler_info()["workers"]
input_df = _memory_efficient_drop_duplicates(
input_df, vertex_col_name, len(workers)
)

result_nbr = uniform_neighbor_sample(
dg,
input_combo["start_list"],
Expand Down
2 changes: 1 addition & 1 deletion python/pylibcugraph/pylibcugraph/graphs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,9 @@ cdef class MGGraph(_GPUGraph):
edge_type_view_ptr_ptr,
store_transposed,
num_arrays,
do_expensive_check,
drop_self_loops,
drop_multi_edges,
do_expensive_check,
&(self.c_graph_ptr),
&error_ptr)

Expand Down

0 comments on commit 3d52f17

Please sign in to comment.