Skip to content

Commit

Permalink
pass list of edgelist to the plc graph creation
Browse files Browse the repository at this point in the history
  • Loading branch information
jnke2016 committed Nov 18, 2023
1 parent 0fc84bd commit 76e5c15
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,47 +89,55 @@ def _make_plc_graph(
src_col_name,
dst_col_name,
store_transposed,
num_edges,
):

weights = None
edge_ids = None
edge_types = None
num_arrays = len(edata_x)

print("edata = \n", edata_x)

if simpleDistributedGraphImpl.edgeWeightCol in edata_x[0]:
weights = _get_column_from_ls_dfs(
edata_x, simpleDistributedGraphImpl.edgeWeightCol
)
#weights = edata_x[simpleDistributedGraphImpl.edgeWeightCol]
weights = [edata_x[i][simpleDistributedGraphImpl.edgeWeightCol] for i in range(num_arrays)]
if weights.dtype == "int32":
weights = weights.astype("float32")
#weights = weights.astype("float32")
weights = [w_array.astype("float32") for w_array in weights]
elif weights.dtype == "int64":
weights = weights.astype("float64")
#weights = weights.astype("float64")
weights = [w_array.astype("float64") for w_array in weights]

if simpleDistributedGraphImpl.edgeIdCol in edata_x[0]:
edge_ids = _get_column_from_ls_dfs(
edata_x, simpleDistributedGraphImpl.edgeIdCol
)
#edge_ids = edata_x[simpleDistributedGraphImpl.edgeIdCol]
edge_ids = [edata_x[i][simpleDistributedGraphImpl.edgeIdCol] for i in range(num_arrays)]
if edata_x[0][src_col_name].dtype == "int64" and edge_ids.dtype != "int64":
edge_ids = edge_ids.astype("int64")
#edge_ids = edge_ids.astype("int64")
edge_ids = [e_id_array.astype("int64") for e_id_array in edge_ids]
warnings.warn(
f"Vertex type is int64 but edge id type is {edge_ids.dtype}"
f"Vertex type is int64 but edge id type is {edge_ids[0].dtype}"
", automatically casting edge id type to int64. "
"This may cause extra memory usage. Consider passing"
" a int64 list of edge ids instead."
)
if simpleDistributedGraphImpl.edgeTypeCol in edata_x[0]:
edge_types = _get_column_from_ls_dfs(
edata_x, simpleDistributedGraphImpl.edgeTypeCol
)
#edge_types = edata_x[simpleDistributedGraphImpl.edgeTypeCol]
edge_types = [edata_x[i][simpleDistributedGraphImpl.edgeTypeCol] for i in range(num_arrays)]

#src_array = [edata_x[i][src_col_name] for i in range(len(edata_x))]
#src_array.append(edata_x[i][src_col_name] for i in range(len(edata_x)))
#print("***src_array = \n", src_array)
#print("src_array =\n", edata_x[0][src_col_name])

return MGGraph(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
graph_properties=graph_props,
src_array=_get_column_from_ls_dfs(edata_x, src_col_name),
dst_array=_get_column_from_ls_dfs(edata_x, dst_col_name),
src_array=[edata_x[i][src_col_name] for i in range(num_arrays)],
dst_array=[edata_x[i][dst_col_name] for i in range(num_arrays)],
weight_array=weights,
edge_id_array=edge_ids,
edge_type_array=edge_types,
num_arrays=num_arrays,
store_transposed=store_transposed,
do_expensive_check=False,
)
Expand Down Expand Up @@ -322,10 +330,7 @@ def __from_edgelist(
ddf, _client, return_type="dict"
)
del ddf
length_of_parts = get_length_of_parts(persisted_keys_d, _client)
num_edges = sum(
[item for sublist in length_of_parts.values() for item in sublist]
)

delayed_tasks_d = {
w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
Comms.get_session_id(),
Expand All @@ -334,7 +339,6 @@ def __from_edgelist(
src_col_name,
dst_col_name,
store_transposed,
num_edges,
)
for w, edata in persisted_keys_d.items()
}
Expand Down
2 changes: 1 addition & 1 deletion python/pylibcugraph/pylibcugraph/_cugraph_c/graph.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down
203 changes: 143 additions & 60 deletions python/pylibcugraph/pylibcugraph/graphs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ from pylibcugraph.graph_properties cimport (
from pylibcugraph.utils cimport (
assert_success,
assert_CAI_type,
get_c_type_from_numpy_type,
create_cugraph_type_erased_device_array_view_from_py_obj,
)
from pylibcugraph._cugraph_c.array cimport (
cugraph_type_erased_device_array_view_create,
)
from libc.stdlib cimport malloc



cdef class SGGraph(_GPUGraph):
Expand Down Expand Up @@ -301,7 +307,7 @@ cdef class MGGraph(_GPUGraph):
Set to True if the graph should be transposed. This is required for some
algorithms, such as pagerank.
num_edges : int
num_arrays : size_t # FIXME: Should this parameter be internal to PLC?
Number of edges
do_expensive_check : bool
Expand Down Expand Up @@ -339,6 +345,9 @@ cdef class MGGraph(_GPUGraph):
drop_self_loops=False,
drop_multi_edges=False):

print("plc-num_arrays = ", num_arrays)
#print("the len of the src array = ", len(src_array[0]))
#print("the len of the src array = ", len(src_array[1]))
# FIXME: add tests for these
if not(isinstance(store_transposed, (int, bool))):
raise TypeError("expected int or bool for store_transposed, got "
Expand All @@ -347,18 +356,6 @@ cdef class MGGraph(_GPUGraph):
if not(isinstance(do_expensive_check, (int, bool))):
raise TypeError("expected int or bool for do_expensive_check, got "
f"{type(do_expensive_check)}")
assert_CAI_type(src_array, "src_array")
assert_CAI_type(dst_array, "dst_array")
assert_CAI_type(weight_array, "weight_array", True)
assert_CAI_type(vertices_array, "vertices_array", True)

assert_CAI_type(edge_id_array, "edge_id_array", True)
if edge_id_array is not None and len(edge_id_array) != len(src_array):
raise ValueError('Edge id array must be same length as edgelist')

assert_CAI_type(edge_type_array, "edge_type_array", True)
if edge_type_array is not None and len(edge_type_array) != len(src_array):
raise ValueError('Edge type array must be same length as edgelist')

# FIXME: assert that src_array and dst_array have the same type

Expand All @@ -368,12 +365,28 @@ cdef class MGGraph(_GPUGraph):
# FIXME: Assert that the 'vertices', 'src', 'dst', 'weights', 'edge_ids'
# and 'edge_type_ids' are of size 'num_arrays'.

#cdef cugraph_type_erased_device_array_view_t* srcs_view_ptr[0] = \
# create_cugraph_type_erased_device_array_view_from_py_obj(
# src_array
# )

cdef cugraph_type_erased_device_array_view_t** srcs_view_ptr_ptr = NULL
#cdef cugraph_type_erased_device_array_view_t** srcs_view_ptr_ptr[2]
cdef cugraph_type_erased_device_array_view_t** srcs_view_ptr_ptr = <cugraph_type_erased_device_array_view_t **>malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*))
cdef cugraph_type_erased_device_array_view_t** dsts_view_ptr_ptr = <cugraph_type_erased_device_array_view_t **>malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*))
cdef cugraph_type_erased_device_array_view_t** vertices_view_ptr_ptr = NULL
self.weights_view_ptr_ptr = NULL
self.edge_id_view_ptr_ptr = NULL
cdef cugraph_type_erased_device_array_view_t** edge_type_view_ptr_ptr = NULL

print("size of 'cugraph_type_erased_device_array_view_t' = ", sizeof(cugraph_type_erased_device_array_view_t))
print("size of 'cugraph_type_erased_device_array_view_t*' = ", sizeof(cugraph_type_erased_device_array_view_t*)) # working

#cdef cugraph_type_erased_device_array_view_t* srcs_view_ptr_ptr_
"""
print("\n Cleaning memory")
for i in range(2):
srcs_view_ptr_ptr[i] = NULL
#print("v_0 = ", srcs_view_ptr_ptr[0])
#print("v_0 = ", srcs_view_ptr_ptr[2])
print("\n Done cleaning memory")
"""
"""
cdef cugraph_type_erased_device_array_view_t* srcs_view_ptr = NULL
cdef cugraph_type_erased_device_array_view_t** dsts_view_ptr_ptr = NULL
Expand All @@ -382,72 +395,139 @@ cdef class MGGraph(_GPUGraph):
cdef cugraph_type_erased_device_array_view_t** vertices_view_ptr_ptr = NULL
cdef cugraph_type_erased_device_array_view_t* vertices_view_ptr = NULL
#cdef cugraph_type_erased_device_array_view_t** weights_view_ptr_ptr = NULL
#cdef cugraph_type_erased_device_array_view_t* weights_view_ptr = NULL

#cdef cugraph_type_erased_device_array_view_t** edge_id_view_ptr_ptr = NULL
#cdef cugraph_type_erased_device_array_view_t* edge_id_view_ptr = NULL

cdef cugraph_type_erased_device_array_view_t** edge_type_view_ptr_ptr = NULL
cdef cugraph_type_erased_device_array_view_t* edge_type_view_ptr = NULL
"""


#list_ = []
for i in range(num_arrays):
print("the len of the src array = ", len(src_array[i]))
if do_expensive_check:
# FIXME: This assumes that src, dst, ... arrays are lists which is always
# 'True' because there are at least two partitions per workers.
assert_CAI_type(src_array[i], "src_array")
assert_CAI_type(dst_array[i], "dst_array")
assert_CAI_type(weight_array[i], "weight_array", True)
assert_CAI_type(vertices_array[i], "vertices_array", True)

assert_CAI_type(edge_id_array[i], "edge_id_array", True)

if edge_id_array is not None and len(edge_id_array[i]) != len(src_array[i]):
raise ValueError('Edge id array must be same length as edgelist')

assert_CAI_type(edge_type_array[i], "edge_type_array", True)
if edge_type_array[i] is not None and len(edge_type_array[i]) != len(src_array[i]):
raise ValueError('Edge type array must be same length as edgelist')

"""
srcs_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj(
src_array
src_array[i]
)
srcs_view_ptr_ptr = &(srcs_view_ptr)
srcs_view_ptr_ptr += 1

dsts_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj(
dst_array
"""

"""
cai_ptr = src_array[i].__cuda_array_interface__["data"][0]
print("before assigning")
srcs_view_ptr_ptr[i] = cugraph_type_erased_device_array_view_create(
<void*>cai_ptr,
len(src_array[i]),
get_c_type_from_numpy_type(src_array[i].dtype))
print("after assigning")
"""


#srcs_view_ptr_ptr = &(srcs_view_ptr)
"""
if i == 0:
srcs_view_ptr_ptr_ = srcs_view_ptr_ptr
"""
print("plc - ****** i = ", i)

srcs_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj(
src_array[i]
)
#print("plc - address after = %p", &srcs_view_ptr_ptr)
#print("**srcs_view_ptr_ptr before = {0:x}".format(<unsigned long>srcs_view_ptr_ptr))
#print("**srcs_view_ptr before = {0:x}".format(<unsigned long>srcs_view_ptr))
#printf("**before -- %p", <unsigned long>srcs_view_ptr_ptr)
#srcs_view_ptr_ptr += 1
#srcs_view_ptr += 1
#srcs_view_ptr_ptr = NULL
#print("**srcs_view_ptr_ptr after ={0:x}".format(<unsigned long>srcs_view_ptr_ptr))
#print("**srcs_view_ptr after ={0:x}".format(<unsigned long>srcs_view_ptr))

#print("plc - address after = %p", &srcs_view_ptr_ptr)
#srcs_view_ptr += 1



dsts_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj(
dst_array[i]
)
"""
dsts_view_ptr_ptr = &(dsts_view_ptr)
dsts_view_ptr_ptr += 1
"""

if vertices_array is not None:
vertices_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj(
vertices_array
vertices_view_ptr_ptr = <cugraph_type_erased_device_array_view_t **>malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*))
vertices_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj(
vertices_array[i]
)
"""
vertices_view_ptr_ptr = &(vertices_view_ptr)
vertices_view_ptr_ptr += 1
"""
else:
# Need to free the memory created
pass

# FIXME: When checking wethere a graph has weights or edge ids, properly handle
# SG VS MG as SG will have 'self.weights_view_ptr_ptr' non 'NULL'.

if weight_array is not None:
self.weights_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj(
weight_array
self.weights_view_ptr_ptr = <cugraph_type_erased_device_array_view_t **>malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*))
self.weights_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj(
weight_array[i]
)
self.weights_view_ptr_ptr = &(self.weights_view_ptr)
self.weights_view_ptr_ptr += 1
#self.weights_view_ptr_ptr = &(self.weights_view_ptr)
#self.weights_view_ptr_ptr += 1

if edge_id_array is not None:
self.edge_id_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj(
edge_id_array
self.edge_id_view_ptr_ptr = <cugraph_type_erased_device_array_view_t **>malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*))
self.edge_id_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj(
edge_id_array[i]
)
self.edge_id_view_ptr_ptr = &(self.edge_id_view_ptr)
self.edge_id_view_ptr_ptr += 1
#self.edge_id_view_ptr_ptr = &(self.edge_id_view_ptr)
#self.edge_id_view_ptr_ptr += 1

if edge_type_array is not None:
edge_type_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj(
edge_type_array
edge_type_view_ptr_ptr = <cugraph_type_erased_device_array_view_t **>malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*))
edge_type_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj(
edge_type_array[i]
)
edge_type_view_ptr_ptr = &(edge_type_view_ptr)
edge_type_view_ptr_ptr += 1
#edge_type_view_ptr_ptr = &(edge_type_view_ptr)
#edge_type_view_ptr_ptr += 1


srcs_view_ptr_ptr -= num_arrays
dsts_view_ptr_ptr -= num_arrays
#srcs_view_ptr_ptr -= 1
##print
"""
dsts_view_ptr_ptr -= 2
if vertices_array:
vertices_view_ptr_ptr -= num_arrays
vertices_view_ptr_ptr -= 2
#pass
if weight_array:
self.weights_view_ptr_ptr -= num_arrays
self.weights_view_ptr_ptr -= 2
#pass
if edge_id_array:
self.edge_id_view_ptr_ptr -= num_arrays
self.edge_id_view_ptr_ptr -= 2
#pass
if edge_type_array:
self.edge_type_view_ptr_ptr -= num_arrays

self.edge_type_view_ptr_ptr -= 2
#pass
"""
print("\nrunning graph\n", flush=True)
error_code = cugraph_graph_create_mg(
resource_handle.c_resource_handle_ptr,
&(graph_properties.c_graph_properties),
Expand All @@ -467,15 +547,18 @@ cdef class MGGraph(_GPUGraph):

assert_success(error_code, error_ptr,
"cugraph_mg_graph_create()")

print("\nDone running graph\n", flush=True)

cugraph_type_erased_device_array_view_free(srcs_view_ptr_ptr[0])
cugraph_type_erased_device_array_view_free(dsts_view_ptr_ptr[0])
if self.weights_view_ptr is not NULL:
cugraph_type_erased_device_array_view_free(self.weights_view_ptr_ptr[0])
if self.edge_id_view_ptr is not NULL:
cugraph_type_erased_device_array_view_free(self.edge_id_view_ptr_ptr[0])
if edge_type_view_ptr is not NULL:
cugraph_type_erased_device_array_view_free(edge_type_view_ptr_ptr[0])
for i in range(num_arrays):
cugraph_type_erased_device_array_view_free(srcs_view_ptr_ptr[i])
cugraph_type_erased_device_array_view_free(dsts_view_ptr_ptr[i])
if self.weights_view_ptr_ptr is not NULL:
cugraph_type_erased_device_array_view_free(self.weights_view_ptr_ptr[i])
if self.edge_id_view_ptr_ptr is not NULL:
cugraph_type_erased_device_array_view_free(self.edge_id_view_ptr_ptr[i])
if edge_type_view_ptr_ptr is not NULL:
cugraph_type_erased_device_array_view_free(edge_type_view_ptr_ptr[i])

def __dealloc__(self):
if self.c_graph_ptr is not NULL:
Expand Down

0 comments on commit 76e5c15

Please sign in to comment.