From a7c6a22a0af77ef6f79bfb3fca3e615bb6356831 Mon Sep 17 00:00:00 2001 From: G-Cornett Date: Thu, 8 Aug 2024 12:11:30 -0700 Subject: [PATCH] Node2Vec MG retool and bug fixes --- cpp/src/sampling/random_walks_impl.cuh | 195 +++++++++++++++++++------ 1 file changed, 152 insertions(+), 43 deletions(-) diff --git a/cpp/src/sampling/random_walks_impl.cuh b/cpp/src/sampling/random_walks_impl.cuh index 024002d28dd..afc7d83af4f 100644 --- a/cpp/src/sampling/random_walks_impl.cuh +++ b/cpp/src/sampling/random_walks_impl.cuh @@ -119,30 +119,32 @@ struct node2vec_random_walk_e_bias_op_t { return 1.0 / p_; } // Search zipped vertices for tagged src - auto lower_itr = thrust::lower_bound( + if(intersection_offsets_.size() > 0){ + auto lower_itr = thrust::lower_bound( thrust::seq, thrust::make_zip_iterator(current_vertices_.begin(), prev_vertices_.begin()), thrust::make_zip_iterator(current_vertices_.end(), prev_vertices_.end()), tagged_src); - auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(), - prev_vertices_.begin()), - lower_itr); - - auto start_idx = intersection_offsets_[low_idx]; - auto end_idx = intersection_offsets_[low_idx + 1]; - auto itr = thrust::lower_bound( - thrust::seq, - intersection_indices_.begin() + start_idx, - intersection_indices_.begin() + end_idx, - dst); - // dst not in intersection - if(itr == intersection_indices_.begin() + end_idx){ - return 1.0 / q_; + auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(), + prev_vertices_.begin()), + lower_itr); + if(low_idx >= intersection_offsets_.size() - 1){return 1.0;} + auto start_idx = intersection_offsets_[low_idx]; + auto end_idx = intersection_offsets_[low_idx + 1]; + auto itr = thrust::lower_bound( + thrust::seq, + intersection_indices_.begin() + start_idx, + intersection_indices_.begin() + end_idx, + dst); + // dst not in intersection + if(itr == intersection_indices_.begin() + end_idx){ + return 1.0 / q_; + } } return 1.0; } - // Weighted Biase Operator + // Weighted Bias Operator template __device__ std::enable_if_t, bias_t> operator()( thrust::tuple tagged_src, @@ -156,24 +158,27 @@ struct node2vec_random_walk_e_bias_op_t { return 1.0 / p_; } // Search zipped vertices for tagged src - auto lower_itr = thrust::lower_bound( + if(intersection_offsets_.size() > 0){ + auto lower_itr = thrust::lower_bound( thrust::seq, thrust::make_zip_iterator(current_vertices_.begin(), prev_vertices_.begin()), thrust::make_zip_iterator(current_vertices_.end(), prev_vertices_.end()), tagged_src); - auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(), - prev_vertices_.begin()), - lower_itr); - auto start_idx = intersection_offsets_[low_idx]; - auto end_idx = intersection_offsets_[low_idx + 1]; - auto itr = thrust::lower_bound( - thrust::seq, - intersection_indices_.begin() + start_idx, - intersection_indices_.begin() + end_idx, - dst); - // dst not in intersection - if(itr == intersection_indices_.begin() + end_idx){ - return 1.0 / q_; + auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(), + prev_vertices_.begin()), + lower_itr); + if(low_idx >= intersection_offsets_.size() - 1){return 1.0;} + auto start_idx = intersection_offsets_[low_idx]; + auto end_idx = intersection_offsets_[low_idx + 1]; + auto itr = thrust::lower_bound( + thrust::seq, + intersection_indices_.begin() + start_idx, + intersection_indices_.begin() + end_idx, + dst); + // dst not in intersection + if(itr == intersection_indices_.begin() + end_idx){ + return 1.0 / q_; + } } return 1.0; } @@ -351,12 +356,12 @@ struct node2vec_selector { std::optional>& previous_vertices) { // To do node2vec, I need the following: - // 1) transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v to compute the sum of the - // node2vec style weights - // 2) Generate a random number between [0, output_from_trdnioeebv[v]) - // 3) a sampling value that lets me pick the correct edge based on the same computation - // (essentially weighted sampling, but with a function that computes the weight rather - // than just using the edge weights) + // 1) Find nbr_intersection() CSR + // 2) Convert CSR into COO format, with (p,c,d) triples + // 3) Shuffle COO structure s.t. each entry is on GPU where (c,d) resides + // 4) Revert COO back to CSR + // After this, existing functor should work for symmetric graphs. Add CUGRAPH_EXPECTS to ensure + // no asymmetric graphs and add FIXME // Create vertex frontier using vertex_t = typename GraphViewType::vertex_type; @@ -380,7 +385,103 @@ struct node2vec_selector { intersection_pairs + current_vertices.size(), std::array{true, true}, false); + + if constexpr (GraphViewType::is_multi_gpu) { + // Create COO data structures + rmm::device_uvector current_COO_buffer(intersection_indices.size(), + handle.get_stream()); + rmm::device_uvector destination_COO_buffer(intersection_indices.size(), + handle.get_stream()); + auto intersection_COO = thrust::make_zip_iterator(current_COO_buffer.begin(), + destination_COO_buffer.begin()); + + // Fill COO data structure + thrust::for_each( + handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(intersection_indices.size()), + [intersection_COO, + current_vertices = raft::device_span(current_vertices.data(), current_vertices.size()), + intersection_offsets = raft::device_span(intersection_offsets.data(), intersection_offsets.size()), + intersection_indices = raft::device_span(intersection_indices.data(), intersection_indices.size())] + __device__ (auto i) { + auto itr = thrust::upper_bound(thrust::seq, + intersection_offsets.begin(), + intersection_offsets.end(), + i); + auto idx = thrust::distance(intersection_offsets.begin(), itr); + intersection_COO[i] = thrust::make_tuple(current_vertices[idx - 1], + intersection_indices[i]); + }); + + // Create device partition range + rmm::device_uvector d_vertex_partition_range_lasts(graph_view.vertex_partition_range_lasts().size(), + handle.get_stream()); + raft::update_device(d_vertex_partition_range_lasts.data(), + graph_view.vertex_partition_range_lasts().data(), + graph_view.vertex_partition_range_lasts().size(), + handle.get_stream()); + + // Shuffle by current-destination edge endpoints + std::forward_as_tuple(std::tie(current_COO_buffer, destination_COO_buffer), std::ignore) = + cugraph::groupby_gpu_id_and_shuffle_values( + handle.get_comms(), + intersection_COO, + intersection_COO + intersection_indices.size(), + [key_func = cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t{ + raft::device_span(d_vertex_partition_range_lasts.data(), d_vertex_partition_range_lasts.size()), + handle.get_comms().get_size(), + handle.get_subcomm(cugraph::partition_manager::major_comm_name()).get_size(), + handle.get_subcomm(cugraph::partition_manager::minor_comm_name()).get_size()}] __device__ (auto tuple) { + return key_func(thrust::get<0>(tuple), thrust::get<1>(tuple)); + }, + handle.get_stream()); + + // Sort post-shuffle for CSR conversion + thrust::sort(handle.get_thrust_policy(), + thrust::make_zip_iterator(current_COO_buffer.begin(), + destination_COO_buffer.begin()), + thrust::make_zip_iterator(current_COO_buffer.end(), + destination_COO_buffer.end())); + if(current_COO_buffer.size() > 0){ + // Resize intersection data structures + intersection_offsets.resize(current_COO_buffer.back_element(handle.get_stream()) + 2, handle.get_stream()); + intersection_offsets.shrink_to_fit(handle.get_stream()); + intersection_indices.resize(destination_COO_buffer.size(), handle.get_stream()); + intersection_indices.shrink_to_fit(handle.get_stream()); + intersection_offsets.set_element_to_zero_async(intersection_offsets.size() - 1, handle.get_stream()); + + // Count offset sizes !!!FIXME: this does not convert correctly!!! + thrust::for_each(handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(intersection_offsets.size() - 1), + [current_COO_buffer = raft::device_span(current_COO_buffer.data(), current_COO_buffer.size()), + intersection_offsets = intersection_offsets.data()] __device__ (auto i){ + auto count = thrust::count(thrust::seq, + current_COO_buffer.begin(), + current_COO_buffer.end(), + i); + intersection_offsets[i] = count; + }); + } else { + intersection_offsets.resize(0, handle.get_stream()); + intersection_offsets.shrink_to_fit(handle.get_stream()); + intersection_indices.resize(0, handle.get_stream()); + intersection_indices.shrink_to_fit(handle.get_stream()); + } + // Scan to fill offsets + thrust::exclusive_scan(handle.get_thrust_policy(), + intersection_offsets.begin(), + intersection_offsets.end(), + intersection_offsets.begin()); + + // Copy indices + thrust::copy(handle.get_thrust_policy(), + destination_COO_buffer.begin(), + destination_COO_buffer.end(), + intersection_indices.data()); + } // Create data structs for results rmm::device_uvector minors(0, handle.get_stream()); std::optional> weights{std::nullopt}; @@ -442,11 +543,10 @@ struct node2vec_selector { } // Copy current vertices to previous vertices for two-order walk - thrust::copy( - handle.get_thrust_policy(), - current_vertices.begin(), - current_vertices.end(), - (*previous_vertices).data()); + thrust::copy(handle.get_thrust_policy(), + current_vertices.begin(), + current_vertices.end(), + (*previous_vertices).data()); return std::make_tuple(std::move(minors), std::move(weights)); } @@ -732,11 +832,20 @@ random_walk_impl(raft::handle_t const& handle, // Moved out of if statements to cut down on code duplication current_vertices.resize(compacted_length, handle.get_stream()); + current_vertices.shrink_to_fit(handle.get_stream()); current_position.resize(compacted_length, handle.get_stream()); - if (result_weights) {new_weights->resize(compacted_length, handle.get_stream());} - if (previous_vertices) {previous_vertices->resize(compacted_length, handle.get_stream());} + current_position.shrink_to_fit(handle.get_stream()); + if (result_weights) { + new_weights->resize(compacted_length, handle.get_stream()); + new_weights->shrink_to_fit(handle.get_stream()); + } + if (previous_vertices) { + previous_vertices->resize(compacted_length, handle.get_stream()); + previous_vertices->shrink_to_fit(handle.get_stream()); + } if constexpr (multi_gpu) { current_gpu.resize(compacted_length, handle.get_stream()); + current_gpu.shrink_to_fit(handle.get_stream()); // Shuffle back to original GPU if (previous_vertices) {