From 76e5c155325de49088cf92e44a643399f667fc8d Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Sat, 18 Nov 2023 15:29:34 -0800 Subject: [PATCH] pass list of edgelist to the plc graph creation --- .../simpleDistributedGraph.py | 46 ++-- .../pylibcugraph/_cugraph_c/graph.pxd | 2 +- python/pylibcugraph/pylibcugraph/graphs.pyx | 203 ++++++++++++------ 3 files changed, 169 insertions(+), 82 deletions(-) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 6285cd8802f..8c313ce5a37 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -89,47 +89,55 @@ def _make_plc_graph( src_col_name, dst_col_name, store_transposed, - num_edges, ): weights = None edge_ids = None edge_types = None + num_arrays = len(edata_x) + + print("edata = \n", edata_x) if simpleDistributedGraphImpl.edgeWeightCol in edata_x[0]: - weights = _get_column_from_ls_dfs( - edata_x, simpleDistributedGraphImpl.edgeWeightCol - ) + #weights = edata_x[simpleDistributedGraphImpl.edgeWeightCol] + weights = [edata_x[i][simpleDistributedGraphImpl.edgeWeightCol] for i in range(num_arrays)] if weights.dtype == "int32": - weights = weights.astype("float32") + #weights = weights.astype("float32") + weights = [w_array.astype("float32") for w_array in weights] elif weights.dtype == "int64": - weights = weights.astype("float64") + #weights = weights.astype("float64") + weights = [w_array.astype("float64") for w_array in weights] if simpleDistributedGraphImpl.edgeIdCol in edata_x[0]: - edge_ids = _get_column_from_ls_dfs( - edata_x, simpleDistributedGraphImpl.edgeIdCol - ) + #edge_ids = edata_x[simpleDistributedGraphImpl.edgeIdCol] + edge_ids = [edata_x[i][simpleDistributedGraphImpl.edgeIdCol] for i in range(num_arrays)] if edata_x[0][src_col_name].dtype == "int64" and edge_ids.dtype != "int64": - edge_ids = edge_ids.astype("int64") + #edge_ids = edge_ids.astype("int64") + edge_ids = [e_id_array.astype("int64") for e_id_array in edge_ids] warnings.warn( - f"Vertex type is int64 but edge id type is {edge_ids.dtype}" + f"Vertex type is int64 but edge id type is {edge_ids[0].dtype}" ", automatically casting edge id type to int64. " "This may cause extra memory usage. Consider passing" " a int64 list of edge ids instead." ) if simpleDistributedGraphImpl.edgeTypeCol in edata_x[0]: - edge_types = _get_column_from_ls_dfs( - edata_x, simpleDistributedGraphImpl.edgeTypeCol - ) + #edge_types = edata_x[simpleDistributedGraphImpl.edgeTypeCol] + edge_types = [edata_x[i][simpleDistributedGraphImpl.edgeTypeCol] for i in range(num_arrays)] + + #src_array = [edata_x[i][src_col_name] for i in range(len(edata_x))] + #src_array.append(edata_x[i][src_col_name] for i in range(len(edata_x))) + #print("***src_array = \n", src_array) + #print("src_array =\n", edata_x[0][src_col_name]) return MGGraph( resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), graph_properties=graph_props, - src_array=_get_column_from_ls_dfs(edata_x, src_col_name), - dst_array=_get_column_from_ls_dfs(edata_x, dst_col_name), + src_array=[edata_x[i][src_col_name] for i in range(num_arrays)], + dst_array=[edata_x[i][dst_col_name] for i in range(num_arrays)], weight_array=weights, edge_id_array=edge_ids, edge_type_array=edge_types, + num_arrays=num_arrays, store_transposed=store_transposed, do_expensive_check=False, ) @@ -322,10 +330,7 @@ def __from_edgelist( ddf, _client, return_type="dict" ) del ddf - length_of_parts = get_length_of_parts(persisted_keys_d, _client) - num_edges = sum( - [item for sublist in length_of_parts.values() for item in sublist] - ) + delayed_tasks_d = { w: delayed(simpleDistributedGraphImpl._make_plc_graph)( Comms.get_session_id(), @@ -334,7 +339,6 @@ def __from_edgelist( src_col_name, dst_col_name, store_transposed, - num_edges, ) for w, edata in persisted_keys_d.items() } diff --git a/python/pylibcugraph/pylibcugraph/_cugraph_c/graph.pxd b/python/pylibcugraph/pylibcugraph/_cugraph_c/graph.pxd index 8221a3a6e2d..28a9f5a3be5 100644 --- a/python/pylibcugraph/pylibcugraph/_cugraph_c/graph.pxd +++ b/python/pylibcugraph/pylibcugraph/_cugraph_c/graph.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/python/pylibcugraph/pylibcugraph/graphs.pyx b/python/pylibcugraph/pylibcugraph/graphs.pyx index 799fc1f5e93..4af44829320 100644 --- a/python/pylibcugraph/pylibcugraph/graphs.pyx +++ b/python/pylibcugraph/pylibcugraph/graphs.pyx @@ -44,8 +44,14 @@ from pylibcugraph.graph_properties cimport ( from pylibcugraph.utils cimport ( assert_success, assert_CAI_type, + get_c_type_from_numpy_type, create_cugraph_type_erased_device_array_view_from_py_obj, ) +from pylibcugraph._cugraph_c.array cimport ( + cugraph_type_erased_device_array_view_create, +) +from libc.stdlib cimport malloc + cdef class SGGraph(_GPUGraph): @@ -301,7 +307,7 @@ cdef class MGGraph(_GPUGraph): Set to True if the graph should be transposed. This is required for some algorithms, such as pagerank. - num_edges : int + num_arrays : size_t # FIXME: Should this parameter be internal to PLC? Number of edges do_expensive_check : bool @@ -339,6 +345,9 @@ cdef class MGGraph(_GPUGraph): drop_self_loops=False, drop_multi_edges=False): + print("plc-num_arrays = ", num_arrays) + #print("the len of the src array = ", len(src_array[0])) + #print("the len of the src array = ", len(src_array[1])) # FIXME: add tests for these if not(isinstance(store_transposed, (int, bool))): raise TypeError("expected int or bool for store_transposed, got " @@ -347,18 +356,6 @@ cdef class MGGraph(_GPUGraph): if not(isinstance(do_expensive_check, (int, bool))): raise TypeError("expected int or bool for do_expensive_check, got " f"{type(do_expensive_check)}") - assert_CAI_type(src_array, "src_array") - assert_CAI_type(dst_array, "dst_array") - assert_CAI_type(weight_array, "weight_array", True) - assert_CAI_type(vertices_array, "vertices_array", True) - - assert_CAI_type(edge_id_array, "edge_id_array", True) - if edge_id_array is not None and len(edge_id_array) != len(src_array): - raise ValueError('Edge id array must be same length as edgelist') - - assert_CAI_type(edge_type_array, "edge_type_array", True) - if edge_type_array is not None and len(edge_type_array) != len(src_array): - raise ValueError('Edge type array must be same length as edgelist') # FIXME: assert that src_array and dst_array have the same type @@ -368,12 +365,28 @@ cdef class MGGraph(_GPUGraph): # FIXME: Assert that the 'vertices', 'src', 'dst', 'weights', 'edge_ids' # and 'edge_type_ids' are of size 'num_arrays'. - #cdef cugraph_type_erased_device_array_view_t* srcs_view_ptr[0] = \ - # create_cugraph_type_erased_device_array_view_from_py_obj( - # src_array - # ) - cdef cugraph_type_erased_device_array_view_t** srcs_view_ptr_ptr = NULL + #cdef cugraph_type_erased_device_array_view_t** srcs_view_ptr_ptr[2] + cdef cugraph_type_erased_device_array_view_t** srcs_view_ptr_ptr = malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*)) + cdef cugraph_type_erased_device_array_view_t** dsts_view_ptr_ptr = malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*)) + cdef cugraph_type_erased_device_array_view_t** vertices_view_ptr_ptr = NULL + self.weights_view_ptr_ptr = NULL + self.edge_id_view_ptr_ptr = NULL + cdef cugraph_type_erased_device_array_view_t** edge_type_view_ptr_ptr = NULL + + print("size of 'cugraph_type_erased_device_array_view_t' = ", sizeof(cugraph_type_erased_device_array_view_t)) + print("size of 'cugraph_type_erased_device_array_view_t*' = ", sizeof(cugraph_type_erased_device_array_view_t*)) # working + + #cdef cugraph_type_erased_device_array_view_t* srcs_view_ptr_ptr_ + """ + print("\n Cleaning memory") + for i in range(2): + srcs_view_ptr_ptr[i] = NULL + #print("v_0 = ", srcs_view_ptr_ptr[0]) + #print("v_0 = ", srcs_view_ptr_ptr[2]) + print("\n Done cleaning memory") + """ + """ cdef cugraph_type_erased_device_array_view_t* srcs_view_ptr = NULL cdef cugraph_type_erased_device_array_view_t** dsts_view_ptr_ptr = NULL @@ -382,72 +395,139 @@ cdef class MGGraph(_GPUGraph): cdef cugraph_type_erased_device_array_view_t** vertices_view_ptr_ptr = NULL cdef cugraph_type_erased_device_array_view_t* vertices_view_ptr = NULL - #cdef cugraph_type_erased_device_array_view_t** weights_view_ptr_ptr = NULL - #cdef cugraph_type_erased_device_array_view_t* weights_view_ptr = NULL - - #cdef cugraph_type_erased_device_array_view_t** edge_id_view_ptr_ptr = NULL - #cdef cugraph_type_erased_device_array_view_t* edge_id_view_ptr = NULL - cdef cugraph_type_erased_device_array_view_t** edge_type_view_ptr_ptr = NULL cdef cugraph_type_erased_device_array_view_t* edge_type_view_ptr = NULL + """ - + #list_ = [] for i in range(num_arrays): + print("the len of the src array = ", len(src_array[i])) + if do_expensive_check: + # FIXME: This assumes that src, dst, ... arrays are lists which is always + # 'True' because there are at least two partitions per workers. + assert_CAI_type(src_array[i], "src_array") + assert_CAI_type(dst_array[i], "dst_array") + assert_CAI_type(weight_array[i], "weight_array", True) + assert_CAI_type(vertices_array[i], "vertices_array", True) + + assert_CAI_type(edge_id_array[i], "edge_id_array", True) + + if edge_id_array is not None and len(edge_id_array[i]) != len(src_array[i]): + raise ValueError('Edge id array must be same length as edgelist') + + assert_CAI_type(edge_type_array[i], "edge_type_array", True) + if edge_type_array[i] is not None and len(edge_type_array[i]) != len(src_array[i]): + raise ValueError('Edge type array must be same length as edgelist') + + """ srcs_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj( - src_array + src_array[i] ) - srcs_view_ptr_ptr = &(srcs_view_ptr) - srcs_view_ptr_ptr += 1 - - dsts_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj( - dst_array + """ + + """ + cai_ptr = src_array[i].__cuda_array_interface__["data"][0] + print("before assigning") + srcs_view_ptr_ptr[i] = cugraph_type_erased_device_array_view_create( + cai_ptr, + len(src_array[i]), + get_c_type_from_numpy_type(src_array[i].dtype)) + print("after assigning") + """ + + + #srcs_view_ptr_ptr = &(srcs_view_ptr) + """ + if i == 0: + srcs_view_ptr_ptr_ = srcs_view_ptr_ptr + """ + print("plc - ****** i = ", i) + + srcs_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj( + src_array[i] + ) + #print("plc - address after = %p", &srcs_view_ptr_ptr) + #print("**srcs_view_ptr_ptr before = {0:x}".format(srcs_view_ptr_ptr)) + #print("**srcs_view_ptr before = {0:x}".format(srcs_view_ptr)) + #printf("**before -- %p", srcs_view_ptr_ptr) + #srcs_view_ptr_ptr += 1 + #srcs_view_ptr += 1 + #srcs_view_ptr_ptr = NULL + #print("**srcs_view_ptr_ptr after ={0:x}".format(srcs_view_ptr_ptr)) + #print("**srcs_view_ptr after ={0:x}".format(srcs_view_ptr)) + + #print("plc - address after = %p", &srcs_view_ptr_ptr) + #srcs_view_ptr += 1 + + + + dsts_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj( + dst_array[i] ) + """ dsts_view_ptr_ptr = &(dsts_view_ptr) dsts_view_ptr_ptr += 1 + """ if vertices_array is not None: - vertices_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj( - vertices_array + vertices_view_ptr_ptr = malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*)) + vertices_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj( + vertices_array[i] ) + """ vertices_view_ptr_ptr = &(vertices_view_ptr) vertices_view_ptr_ptr += 1 + """ + else: + # Need to free the memory created + pass # FIXME: When checking wethere a graph has weights or edge ids, properly handle # SG VS MG as SG will have 'self.weights_view_ptr_ptr' non 'NULL'. if weight_array is not None: - self.weights_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj( - weight_array + self.weights_view_ptr_ptr = malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*)) + self.weights_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj( + weight_array[i] ) - self.weights_view_ptr_ptr = &(self.weights_view_ptr) - self.weights_view_ptr_ptr += 1 + #self.weights_view_ptr_ptr = &(self.weights_view_ptr) + #self.weights_view_ptr_ptr += 1 if edge_id_array is not None: - self.edge_id_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj( - edge_id_array + self.edge_id_view_ptr_ptr = malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*)) + self.edge_id_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj( + edge_id_array[i] ) - self.edge_id_view_ptr_ptr = &(self.edge_id_view_ptr) - self.edge_id_view_ptr_ptr += 1 + #self.edge_id_view_ptr_ptr = &(self.edge_id_view_ptr) + #self.edge_id_view_ptr_ptr += 1 if edge_type_array is not None: - edge_type_view_ptr = create_cugraph_type_erased_device_array_view_from_py_obj( - edge_type_array + edge_type_view_ptr_ptr = malloc(num_arrays * sizeof(cugraph_type_erased_device_array_view_t*)) + edge_type_view_ptr_ptr[i] = create_cugraph_type_erased_device_array_view_from_py_obj( + edge_type_array[i] ) - edge_type_view_ptr_ptr = &(edge_type_view_ptr) - edge_type_view_ptr_ptr += 1 + #edge_type_view_ptr_ptr = &(edge_type_view_ptr) + #edge_type_view_ptr_ptr += 1 - srcs_view_ptr_ptr -= num_arrays - dsts_view_ptr_ptr -= num_arrays + #srcs_view_ptr_ptr -= 1 + ##print + """ + dsts_view_ptr_ptr -= 2 if vertices_array: - vertices_view_ptr_ptr -= num_arrays + vertices_view_ptr_ptr -= 2 + #pass if weight_array: - self.weights_view_ptr_ptr -= num_arrays + self.weights_view_ptr_ptr -= 2 + #pass if edge_id_array: - self.edge_id_view_ptr_ptr -= num_arrays + self.edge_id_view_ptr_ptr -= 2 + #pass if edge_type_array: - self.edge_type_view_ptr_ptr -= num_arrays - + self.edge_type_view_ptr_ptr -= 2 + #pass + """ + print("\nrunning graph\n", flush=True) error_code = cugraph_graph_create_mg( resource_handle.c_resource_handle_ptr, &(graph_properties.c_graph_properties), @@ -467,15 +547,18 @@ cdef class MGGraph(_GPUGraph): assert_success(error_code, error_ptr, "cugraph_mg_graph_create()") + + print("\nDone running graph\n", flush=True) - cugraph_type_erased_device_array_view_free(srcs_view_ptr_ptr[0]) - cugraph_type_erased_device_array_view_free(dsts_view_ptr_ptr[0]) - if self.weights_view_ptr is not NULL: - cugraph_type_erased_device_array_view_free(self.weights_view_ptr_ptr[0]) - if self.edge_id_view_ptr is not NULL: - cugraph_type_erased_device_array_view_free(self.edge_id_view_ptr_ptr[0]) - if edge_type_view_ptr is not NULL: - cugraph_type_erased_device_array_view_free(edge_type_view_ptr_ptr[0]) + for i in range(num_arrays): + cugraph_type_erased_device_array_view_free(srcs_view_ptr_ptr[i]) + cugraph_type_erased_device_array_view_free(dsts_view_ptr_ptr[i]) + if self.weights_view_ptr_ptr is not NULL: + cugraph_type_erased_device_array_view_free(self.weights_view_ptr_ptr[i]) + if self.edge_id_view_ptr_ptr is not NULL: + cugraph_type_erased_device_array_view_free(self.edge_id_view_ptr_ptr[i]) + if edge_type_view_ptr_ptr is not NULL: + cugraph_type_erased_device_array_view_free(edge_type_view_ptr_ptr[i]) def __dealloc__(self): if self.c_graph_ptr is not NULL: