From 374b103c0b84f4a83e96844c859e5aebbd108758 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Wed, 20 Sep 2023 07:37:54 -0700 Subject: [PATCH] refactor code into new shared utility --- .../dask/sampling/uniform_neighbor_sample.py | 206 ++++++++++++------ .../cugraph/sampling/sampling_utilities.py | 175 +++++++++++++++ .../sampling/uniform_neighbor_sample.py | 159 ++------------ .../test_uniform_neighbor_sample_mg.py | 1 + 4 files changed, 331 insertions(+), 210 deletions(-) create mode 100644 python/cugraph/cugraph/sampling/sampling_utilities.py diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index 9e50169b4a7..51372912120 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -122,100 +122,163 @@ def create_empty_df_with_edge_props( def convert_to_cudf( - cp_arrays, weight_t, with_edge_properties, return_offsets=False, renumber=False + cupy_array_dict, weight_t, num_hops, with_edge_properties=False, return_offsets=False, renumber=False, use_legacy_names=True,include_hop_column=True, ): """ Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper """ - df = cudf.DataFrame() + results_df = cudf.DataFrame() + + if use_legacy_names: + major_col_name = "sources" + minor_col_name = "destinations" + warning_msg = ( + "The legacy column names (sources, destinations)" + " will no longer be supported for uniform_neighbor_sample" + " in release 23.12. The use_legacy_names=False option will" + " become the only option, and (majors, minors) will be the" + " only supported column names." + ) + warnings.warn(warning_msg, FutureWarning) + else: + major_col_name = "majors" + minor_col_name = "minors" if with_edge_properties: - if renumber: - ( - sources, - destinations, - weights, - edge_ids, - edge_types, - batch_ids, - offsets, - hop_ids, - renumber_map, - renumber_map_offsets, - ) = cp_arrays - else: - ( - sources, - destinations, - weights, - edge_ids, - edge_types, - batch_ids, - offsets, - hop_ids, - ) = cp_arrays + results_df_cols = [ + 'majors', + 'minors', + 'weight', + 'edge_id', + 'edge_type', + 'hop_id' + ] + + for col in results_df_cols: + array = cupy_array_dict[col] + if array is not None: + # The length of each of these arrays should be the same + results_df[col] = array + + results_df.rename(columns={'majors':major_col_name, 'minors':minor_col_name},inplace=True) + + label_hop_offsets = cupy_array_dict['label_hop_offsets'] + batch_ids = cupy_array_dict['batch_id'] - df[src_n] = sources - df[dst_n] = destinations - df[weight_n] = weights - df[edge_id_n] = edge_ids - df[edge_type_n] = edge_types - df[hop_id_n] = hop_ids + if renumber: + renumber_df = cudf.DataFrame({ + 'map': cupy_array_dict['renumber_map'], + }) - return_dfs = [df] + if not return_offsets: + if len(batch_ids) > 0: + batch_ids_r = cudf.Series(batch_ids).repeat( + cp.diff(cupy_array_dict['renumber_map_offsets']) + ) + batch_ids_r.reset_index(drop=True, inplace=True) + renumber_df["batch_id"] = batch_ids_r + else: + renumber_df['batch_id'] = None if return_offsets: - offsets_df = cudf.DataFrame( - { - batch_id_n: batch_ids, - offsets_n: offsets[:-1], - } + batches_series = cudf.Series( + batch_ids, + name="batch_id", ) + if include_hop_column: + # TODO remove this logic in release 23.12 + offsets_df = cudf.Series( + label_hop_offsets[cp.arange(len(batch_ids)+1) * num_hops], + name='offsets', + ).to_frame() + else: + offsets_df = cudf.Series( + label_hop_offsets, + name="offsets", + ).to_frame() + + if len(batches_series) > len(offsets_df): + # this is extremely rare so the inefficiency is ok + offsets_df = offsets_df.join(batches_series, how='outer').sort_index() + else: + offsets_df['batch_id'] = batches_series if renumber: - offsets_df[map_offsets_n] = renumber_map_offsets[:-1] - - return_dfs.append(offsets_df) - else: - batch_ids_b = batch_ids - if len(batch_ids_b) > 0: - batch_ids_b = cudf.Series(batch_ids_b).repeat(cp.diff(offsets)) - batch_ids_b.reset_index(drop=True, inplace=True) - - df[batch_id_n] = batch_ids_b + renumber_offset_series = cudf.Series( + cupy_array_dict['renumber_map_offsets'], + name="renumber_map_offsets" + ) - if renumber: - renumber_df = cudf.DataFrame( - { - "map": renumber_map, - } - ) + if len(renumber_offset_series) > len(renumber_df): + # this is extremely rare so the inefficiency is ok + renumber_df = renumber_df.join(renumber_offset_series, how='outer').sort_index() + else: + renumber_df['renumber_map_offsets'] = renumber_offset_series - if not return_offsets: - batch_ids_r = cudf.Series(batch_ids).repeat( - cp.diff(renumber_map_offsets) - ) + else: + if len(batch_ids) > 0: + batch_ids_r = cudf.Series(cp.repeat(batch_ids, num_hops)) + batch_ids_r = cudf.Series(batch_ids_r).repeat(cp.diff(label_hop_offsets)) batch_ids_r.reset_index(drop=True, inplace=True) - renumber_df["batch_id"] = batch_ids_r - return_dfs.append(renumber_df) + results_df["batch_id"] = batch_ids_r + else: + results_df['batch_id'] = None + + # TODO remove this logic in release 23.12, hops will always returned as offsets + if include_hop_column: + if len(batch_ids) > 0: + hop_ids_r = cudf.Series(cp.arange(num_hops)) + hop_ids_r = cudf.concat([hop_ids_r] * len(batch_ids),ignore_index=True) + + # generate the hop column + hop_ids_r = cudf.Series(hop_ids_r, name='hop_id').repeat( + cp.diff(label_hop_offsets) + ).reset_index(drop=True) + else: + hop_ids_r = cudf.Series(name='hop_id', dtype='int32') + + results_df = results_df.join(hop_ids_r, how='outer').sort_index() + + if major_col_name not in results_df: + if use_legacy_names: + raise ValueError("Can't use legacy names with major offsets") + + major_offsets_series = cudf.Series(cupy_array_dict['major_offsets'], name='major_offsets') + if len(major_offsets_series) > len(results_df): + # this is extremely rare so the inefficiency is ok + results_df = results_df.join(major_offsets_series, how='outer').sort_index() + else: + results_df['major_offsets'] = major_offsets_series - return tuple(return_dfs) else: - cupy_sources, cupy_destinations, cupy_indices = cp_arrays + # TODO this is deprecated, remove it in 23.12 - df[src_n] = cupy_sources - df[dst_n] = cupy_destinations - df[indices_n] = cupy_indices + results_df[major_col_name] = cupy_array_dict['sources'] + results_df[minor_col_name] = cupy_array_dict['destinations'] + indices = cupy_array_dict['indices'] - if cupy_indices is not None: + if indices is None: + results_df["indices"] = None + else: + results_df["indices"] = indices if weight_t == "int32": - df.indices = df.indices.astype("int32") + results_df["indices"] = indices.astype("int32") elif weight_t == "int64": - df.indices = df.indices.astype("int64") + results_df["indices"] = indices.astype("int64") + else: + results_df["indices"] = indices + + if return_offsets: + if renumber: + return results_df, offsets_df, renumber_df + else: + return results_df, offsets_df - return (df,) + if renumber: + return results_df, renumber_df + return results_df def __get_label_to_output_comm_rank(min_batch_id, max_batch_id, n_workers): num_batches = max_batch_id - min_batch_id + 1 @@ -259,7 +322,7 @@ def _call_plc_uniform_neighbor_sample( min_batch_id, max_batch_id, n_workers ) - cp_arrays = pylibcugraph_uniform_neighbor_sample( + cp_array_dict = pylibcugraph_uniform_neighbor_sample( resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), input_graph=mg_graph_x, start_list=start_list_x, @@ -275,9 +338,10 @@ def _call_plc_uniform_neighbor_sample( deduplicate_sources=deduplicate_sources, return_hops=return_hops, renumber=renumber, + return_dict=True ) return convert_to_cudf( - cp_arrays, + cp_array_dict, weight_t, with_edge_properties, return_offsets=return_offsets, diff --git a/python/cugraph/cugraph/sampling/sampling_utilities.py b/python/cugraph/cugraph/sampling/sampling_utilities.py new file mode 100644 index 00000000000..1ebb23f6449 --- /dev/null +++ b/python/cugraph/cugraph/sampling/sampling_utilities.py @@ -0,0 +1,175 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cupy +import cudf + +import warnings + +def sampling_results_from_cupy_array_dict(cupy_array_dict, weight_t, num_hops, with_edge_properties=False, return_offsets=False, renumber=False, use_legacy_names=True,include_hop_column=True, +): + """ + Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper + """ + results_df = cudf.DataFrame() + + if use_legacy_names: + major_col_name = "sources" + minor_col_name = "destinations" + warning_msg = ( + "The legacy column names (sources, destinations)" + " will no longer be supported for uniform_neighbor_sample" + " in release 23.12. The use_legacy_names=False option will" + " become the only option, and (majors, minors) will be the" + " only supported column names." + ) + warnings.warn(warning_msg, FutureWarning) + else: + major_col_name = "majors" + minor_col_name = "minors" + + if with_edge_properties: + results_df_cols = [ + 'majors', + 'minors', + 'weight', + 'edge_id', + 'edge_type', + 'hop_id' + ] + + for col in results_df_cols: + array = cupy_array_dict[col] + if array is not None: + # The length of each of these arrays should be the same + results_df[col] = array + + results_df.rename(columns={'majors':major_col_name, 'minors':minor_col_name},inplace=True) + + label_hop_offsets = cupy_array_dict['label_hop_offsets'] + batch_ids = cupy_array_dict['batch_id'] + + if renumber: + renumber_df = cudf.DataFrame({ + 'map': cupy_array_dict['renumber_map'], + }) + + if not return_offsets: + if len(batch_ids) > 0: + batch_ids_r = cudf.Series(batch_ids).repeat( + cupy.diff(cupy_array_dict['renumber_map_offsets']) + ) + batch_ids_r.reset_index(drop=True, inplace=True) + renumber_df["batch_id"] = batch_ids_r + else: + renumber_df['batch_id'] = None + + if return_offsets: + batches_series = cudf.Series( + batch_ids, + name="batch_id", + ) + if include_hop_column: + # TODO remove this logic in release 23.12 + offsets_df = cudf.Series( + label_hop_offsets[cupy.arange(len(batch_ids)+1) * num_hops], + name='offsets', + ).to_frame() + else: + offsets_df = cudf.Series( + label_hop_offsets, + name="offsets", + ).to_frame() + + if len(batches_series) > len(offsets_df): + # this is extremely rare so the inefficiency is ok + offsets_df = offsets_df.join(batches_series, how='outer').sort_index() + else: + offsets_df['batch_id'] = batches_series + + if renumber: + renumber_offset_series = cudf.Series( + cupy_array_dict['renumber_map_offsets'], + name="renumber_map_offsets" + ) + + if len(renumber_offset_series) > len(renumber_df): + # this is extremely rare so the inefficiency is ok + renumber_df = renumber_df.join(renumber_offset_series, how='outer').sort_index() + else: + renumber_df['renumber_map_offsets'] = renumber_offset_series + + else: + if len(batch_ids) > 0: + batch_ids_r = cudf.Series(cupy.repeat(batch_ids, num_hops)) + batch_ids_r = cudf.Series(batch_ids_r).repeat(cupy.diff(label_hop_offsets)) + batch_ids_r.reset_index(drop=True, inplace=True) + + results_df["batch_id"] = batch_ids_r + else: + results_df['batch_id'] = None + + # TODO remove this logic in release 23.12, hops will always returned as offsets + if include_hop_column: + if len(batch_ids) > 0: + hop_ids_r = cudf.Series(cupy.arange(num_hops)) + hop_ids_r = cudf.concat([hop_ids_r] * len(batch_ids),ignore_index=True) + + # generate the hop column + hop_ids_r = cudf.Series(hop_ids_r, name='hop_id').repeat( + cupy.diff(label_hop_offsets) + ).reset_index(drop=True) + else: + hop_ids_r = cudf.Series(name='hop_id', dtype='int32') + + results_df = results_df.join(hop_ids_r, how='outer').sort_index() + + if major_col_name not in results_df: + if use_legacy_names: + raise ValueError("Can't use legacy names with major offsets") + + major_offsets_series = cudf.Series(cupy_array_dict['major_offsets'], name='major_offsets') + if len(major_offsets_series) > len(results_df): + # this is extremely rare so the inefficiency is ok + results_df = results_df.join(major_offsets_series, how='outer').sort_index() + else: + results_df['major_offsets'] = major_offsets_series + + else: + # TODO this is deprecated, remove it in 23.12 + + results_df[major_col_name] = cupy_array_dict['sources'] + results_df[minor_col_name] = cupy_array_dict['destinations'] + indices = cupy_array_dict['indices'] + + if indices is None: + results_df["indices"] = None + else: + results_df["indices"] = indices + if weight_t == "int32": + results_df["indices"] = indices.astype("int32") + elif weight_t == "int64": + results_df["indices"] = indices.astype("int64") + else: + results_df["indices"] = indices + + if return_offsets: + if renumber: + return results_df, offsets_df, renumber_df + else: + return results_df, offsets_df + + if renumber: + return results_df, renumber_df + + return (results_df,) diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index 80f091c4bdd..8df7640e4c7 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -16,6 +16,8 @@ from pylibcugraph import ResourceHandle from pylibcugraph import uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample +from cugraph.sampling.sampling_utilities import sampling_results_from_cupy_array_dict + import numpy import cudf @@ -237,7 +239,7 @@ def uniform_neighbor_sample( "The with_edge_properties flag is deprecated" " and will be removed in the next release." ) - warnings.warn(warning_msg, DeprecationWarning) + warnings.warn(warning_msg, FutureWarning) if isinstance(start_list, int): start_list = [start_list] @@ -295,7 +297,7 @@ def uniform_neighbor_sample( start_list = start_list.rename(columns={columns[0]: start_col_name}) - sampling_result = pylibcugraph_uniform_neighbor_sample( + sampling_result_array_dict = pylibcugraph_uniform_neighbor_sample( resource_handle=ResourceHandle(), input_graph=G._plc_graph, start_list=start_list[start_col_name], @@ -316,143 +318,22 @@ def uniform_neighbor_sample( return_dict=True, ) - results_df = cudf.DataFrame() - - if with_edge_properties: - results_df_cols = [ - 'majors', - 'minors', - 'weight', - 'edge_id', - 'edge_type', - 'hop_id' - ] - for col in results_df_cols: - array = sampling_result[col] - if array is not None: - # The length of each of these arrays should be the same - results_df[col] = array - - results_df.rename(columns={'majors':major_col_name, 'minors':minor_col_name},inplace=True) - - label_hop_offsets = sampling_result['label_hop_offsets'] - batch_ids = sampling_result['batch_id'] - - if renumber: - renumber_df = cudf.DataFrame({ - 'map': sampling_result['renumber_map'], - }) - - if not return_offsets: - if len(batch_ids) > 0: - batch_ids_r = cudf.Series(batch_ids).repeat( - cp.diff(sampling_result['renumber_map_offsets']) - ) - batch_ids_r.reset_index(drop=True, inplace=True) - renumber_df["batch_id"] = batch_ids_r - else: - renumber_df['batch_id'] = None - - if return_offsets: - batches_series = cudf.Series( - batch_ids, - name="batch_id", - ) - if include_hop_column: - # TODO remove this logic in release 23.12 - offsets_df = cudf.Series( - label_hop_offsets[cp.arange(len(batch_ids)+1) * len(fanout_vals)], - name='offsets', - ).to_frame() - else: - offsets_df = cudf.Series( - label_hop_offsets, - name="offsets", - ).to_frame() - - if len(batches_series) > len(offsets_df): - # this is extremely rare so the inefficiency is ok - offsets_df = offsets_df.join(batches_series, how='outer').sort_index() - else: - offsets_df['batch_id'] = batches_series - - if renumber: - renumber_offset_series = cudf.Series( - sampling_result['renumber_map_offsets'], - name="renumber_map_offsets" - ) - - if len(renumber_offset_series) > len(renumber_df): - # this is extremely rare so the inefficiency is ok - renumber_df = renumber_df.join(renumber_offset_series, how='outer').sort_index() - else: - renumber_df['renumber_map_offsets'] = renumber_offset_series - - else: - if len(batch_ids) > 0: - batch_ids_r = cudf.Series(cp.repeat(batch_ids, len(fanout_vals))) - batch_ids_r = cudf.Series(batch_ids_r).repeat(cp.diff(label_hop_offsets)) - batch_ids_r.reset_index(drop=True, inplace=True) - - results_df["batch_id"] = batch_ids_r - else: - results_df['batch_id'] = None - - # TODO remove this logic in release 23.12, hops will always returned as offsets - if include_hop_column: - if len(batch_ids) > 0: - hop_ids_r = cudf.Series(cp.arange(len(fanout_vals))) - hop_ids_r = cudf.concat([hop_ids_r] * len(batch_ids),ignore_index=True) - - # generate the hop column - hop_ids_r = cudf.Series(hop_ids_r, name='hop_id').repeat( - cp.diff(label_hop_offsets) - ).reset_index(drop=True) - else: - hop_ids_r = cudf.Series(name='hop_id', dtype='int32') - - results_df = results_df.join(hop_ids_r, how='outer').sort_index() - - if major_col_name not in results_df: - if use_legacy_names: - raise ValueError("Can't use legacy names with major offsets") - - major_offsets_series = cudf.Series(sampling_result['major_offsets'], name='major_offsets') - if len(major_offsets_series) > len(results_df): - # this is extremely rare so the inefficiency is ok - results_df = results_df.join(major_offsets_series, how='outer').sort_index() - else: - results_df['major_offsets'] = major_offsets_series - - else: - # TODO this is deprecated, remove it in 23.12 - - results_df[major_col_name] = sampling_result['sources'] - results_df[minor_col_name] = sampling_result['destinations'] - indices = sampling_result['indices'] - - if indices is None: - results_df["indices"] = None - else: - results_df["indices"] = indices - if weight_t == "int32": - results_df["indices"] = indices.astype("int32") - elif weight_t == "int64": - results_df["indices"] = indices.astype("int64") - else: - results_df["indices"] = indices + dfs = sampling_results_from_cupy_array_dict( + sampling_result_array_dict, + weight_t, + len(fanout_vals), + with_edge_properties=with_edge_properties, + return_offsets=return_offsets, + renumber=renumber, + use_legacy_names=use_legacy_names, + include_hop_column=include_hop_column + ) if G.renumbered and not renumber: - results_df = G.unrenumber(results_df, major_col_name, preserve_order=True) - results_df = G.unrenumber(results_df, minor_col_name, preserve_order=True) + dfs[0] = G.unrenumber(dfs[0], major_col_name, preserve_order=True) + dfs[0] = G.unrenumber(dfs[0], minor_col_name, preserve_order=True) - if return_offsets: - if renumber: - return results_df, offsets_df, renumber_df - else: - return results_df, offsets_df - - if renumber: - return results_df, renumber_df - - return results_df + if len(dfs) > 1: + return dfs + + return dfs[0] diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py index 9d87c097287..42bc2d400b9 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py @@ -327,6 +327,7 @@ def test_mg_uniform_neighbor_sample_ensure_no_duplicates(dask_client): @pytest.mark.mg @pytest.mark.cugraph_ops @pytest.mark.parametrize("return_offsets", [True, False]) +@pytest.mark.tags("runme") def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets): n_workers = len(dask_client.scheduler_info()["workers"]) if n_workers <= 1: