Skip to content

Commit

Permalink
refactor code into new shared utility
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Sep 20, 2023
1 parent 74195cb commit 374b103
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 210 deletions.
206 changes: 135 additions & 71 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,100 +122,163 @@ def create_empty_df_with_edge_props(


def convert_to_cudf(
cp_arrays, weight_t, with_edge_properties, return_offsets=False, renumber=False
cupy_array_dict, weight_t, num_hops, with_edge_properties=False, return_offsets=False, renumber=False, use_legacy_names=True,include_hop_column=True,
):
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
df = cudf.DataFrame()
results_df = cudf.DataFrame()

if use_legacy_names:
major_col_name = "sources"
minor_col_name = "destinations"
warning_msg = (
"The legacy column names (sources, destinations)"
" will no longer be supported for uniform_neighbor_sample"
" in release 23.12. The use_legacy_names=False option will"
" become the only option, and (majors, minors) will be the"
" only supported column names."
)
warnings.warn(warning_msg, FutureWarning)
else:
major_col_name = "majors"
minor_col_name = "minors"

if with_edge_properties:
if renumber:
(
sources,
destinations,
weights,
edge_ids,
edge_types,
batch_ids,
offsets,
hop_ids,
renumber_map,
renumber_map_offsets,
) = cp_arrays
else:
(
sources,
destinations,
weights,
edge_ids,
edge_types,
batch_ids,
offsets,
hop_ids,
) = cp_arrays
results_df_cols = [
'majors',
'minors',
'weight',
'edge_id',
'edge_type',
'hop_id'
]

for col in results_df_cols:
array = cupy_array_dict[col]
if array is not None:
# The length of each of these arrays should be the same
results_df[col] = array

results_df.rename(columns={'majors':major_col_name, 'minors':minor_col_name},inplace=True)

label_hop_offsets = cupy_array_dict['label_hop_offsets']
batch_ids = cupy_array_dict['batch_id']

df[src_n] = sources
df[dst_n] = destinations
df[weight_n] = weights
df[edge_id_n] = edge_ids
df[edge_type_n] = edge_types
df[hop_id_n] = hop_ids
if renumber:
renumber_df = cudf.DataFrame({
'map': cupy_array_dict['renumber_map'],
})

return_dfs = [df]
if not return_offsets:
if len(batch_ids) > 0:
batch_ids_r = cudf.Series(batch_ids).repeat(
cp.diff(cupy_array_dict['renumber_map_offsets'])
)
batch_ids_r.reset_index(drop=True, inplace=True)
renumber_df["batch_id"] = batch_ids_r
else:
renumber_df['batch_id'] = None

if return_offsets:
offsets_df = cudf.DataFrame(
{
batch_id_n: batch_ids,
offsets_n: offsets[:-1],
}
batches_series = cudf.Series(
batch_ids,
name="batch_id",
)
if include_hop_column:
# TODO remove this logic in release 23.12
offsets_df = cudf.Series(
label_hop_offsets[cp.arange(len(batch_ids)+1) * num_hops],
name='offsets',
).to_frame()
else:
offsets_df = cudf.Series(
label_hop_offsets,
name="offsets",
).to_frame()

if len(batches_series) > len(offsets_df):
# this is extremely rare so the inefficiency is ok
offsets_df = offsets_df.join(batches_series, how='outer').sort_index()
else:
offsets_df['batch_id'] = batches_series

if renumber:
offsets_df[map_offsets_n] = renumber_map_offsets[:-1]

return_dfs.append(offsets_df)
else:
batch_ids_b = batch_ids
if len(batch_ids_b) > 0:
batch_ids_b = cudf.Series(batch_ids_b).repeat(cp.diff(offsets))
batch_ids_b.reset_index(drop=True, inplace=True)

df[batch_id_n] = batch_ids_b
renumber_offset_series = cudf.Series(
cupy_array_dict['renumber_map_offsets'],
name="renumber_map_offsets"
)

if renumber:
renumber_df = cudf.DataFrame(
{
"map": renumber_map,
}
)
if len(renumber_offset_series) > len(renumber_df):
# this is extremely rare so the inefficiency is ok
renumber_df = renumber_df.join(renumber_offset_series, how='outer').sort_index()
else:
renumber_df['renumber_map_offsets'] = renumber_offset_series

if not return_offsets:
batch_ids_r = cudf.Series(batch_ids).repeat(
cp.diff(renumber_map_offsets)
)
else:
if len(batch_ids) > 0:
batch_ids_r = cudf.Series(cp.repeat(batch_ids, num_hops))
batch_ids_r = cudf.Series(batch_ids_r).repeat(cp.diff(label_hop_offsets))
batch_ids_r.reset_index(drop=True, inplace=True)
renumber_df["batch_id"] = batch_ids_r

return_dfs.append(renumber_df)
results_df["batch_id"] = batch_ids_r
else:
results_df['batch_id'] = None

# TODO remove this logic in release 23.12, hops will always returned as offsets
if include_hop_column:
if len(batch_ids) > 0:
hop_ids_r = cudf.Series(cp.arange(num_hops))
hop_ids_r = cudf.concat([hop_ids_r] * len(batch_ids),ignore_index=True)

# generate the hop column
hop_ids_r = cudf.Series(hop_ids_r, name='hop_id').repeat(
cp.diff(label_hop_offsets)
).reset_index(drop=True)
else:
hop_ids_r = cudf.Series(name='hop_id', dtype='int32')

results_df = results_df.join(hop_ids_r, how='outer').sort_index()

if major_col_name not in results_df:
if use_legacy_names:
raise ValueError("Can't use legacy names with major offsets")

major_offsets_series = cudf.Series(cupy_array_dict['major_offsets'], name='major_offsets')
if len(major_offsets_series) > len(results_df):
# this is extremely rare so the inefficiency is ok
results_df = results_df.join(major_offsets_series, how='outer').sort_index()
else:
results_df['major_offsets'] = major_offsets_series

return tuple(return_dfs)
else:
cupy_sources, cupy_destinations, cupy_indices = cp_arrays
# TODO this is deprecated, remove it in 23.12

df[src_n] = cupy_sources
df[dst_n] = cupy_destinations
df[indices_n] = cupy_indices
results_df[major_col_name] = cupy_array_dict['sources']
results_df[minor_col_name] = cupy_array_dict['destinations']
indices = cupy_array_dict['indices']

if cupy_indices is not None:
if indices is None:
results_df["indices"] = None
else:
results_df["indices"] = indices
if weight_t == "int32":
df.indices = df.indices.astype("int32")
results_df["indices"] = indices.astype("int32")
elif weight_t == "int64":
df.indices = df.indices.astype("int64")
results_df["indices"] = indices.astype("int64")
else:
results_df["indices"] = indices

if return_offsets:
if renumber:
return results_df, offsets_df, renumber_df
else:
return results_df, offsets_df

return (df,)
if renumber:
return results_df, renumber_df

return results_df

def __get_label_to_output_comm_rank(min_batch_id, max_batch_id, n_workers):
num_batches = max_batch_id - min_batch_id + 1
Expand Down Expand Up @@ -259,7 +322,7 @@ def _call_plc_uniform_neighbor_sample(
min_batch_id, max_batch_id, n_workers
)

cp_arrays = pylibcugraph_uniform_neighbor_sample(
cp_array_dict = pylibcugraph_uniform_neighbor_sample(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
input_graph=mg_graph_x,
start_list=start_list_x,
Expand All @@ -275,9 +338,10 @@ def _call_plc_uniform_neighbor_sample(
deduplicate_sources=deduplicate_sources,
return_hops=return_hops,
renumber=renumber,
return_dict=True
)
return convert_to_cudf(
cp_arrays,
cp_array_dict,
weight_t,
with_edge_properties,
return_offsets=return_offsets,
Expand Down
Loading

0 comments on commit 374b103

Please sign in to comment.