Skip to content

Commit

Permalink
Node2Vec MG retool and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
G-Cornett committed Aug 8, 2024
1 parent 941086e commit a7c6a22
Showing 1 changed file with 152 additions and 43 deletions.
195 changes: 152 additions & 43 deletions cpp/src/sampling/random_walks_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -119,30 +119,32 @@ struct node2vec_random_walk_e_bias_op_t {
return 1.0 / p_;
}
// Search zipped vertices for tagged src
auto lower_itr = thrust::lower_bound(
if(intersection_offsets_.size() > 0){
auto lower_itr = thrust::lower_bound(
thrust::seq,
thrust::make_zip_iterator(current_vertices_.begin(), prev_vertices_.begin()),
thrust::make_zip_iterator(current_vertices_.end(), prev_vertices_.end()),
tagged_src);
auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(),
prev_vertices_.begin()),
lower_itr);

auto start_idx = intersection_offsets_[low_idx];
auto end_idx = intersection_offsets_[low_idx + 1];
auto itr = thrust::lower_bound(
thrust::seq,
intersection_indices_.begin() + start_idx,
intersection_indices_.begin() + end_idx,
dst);
// dst not in intersection
if(itr == intersection_indices_.begin() + end_idx){
return 1.0 / q_;
auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(),
prev_vertices_.begin()),
lower_itr);
if(low_idx >= intersection_offsets_.size() - 1){return 1.0;}
auto start_idx = intersection_offsets_[low_idx];
auto end_idx = intersection_offsets_[low_idx + 1];
auto itr = thrust::lower_bound(
thrust::seq,
intersection_indices_.begin() + start_idx,
intersection_indices_.begin() + end_idx,
dst);
// dst not in intersection
if(itr == intersection_indices_.begin() + end_idx){
return 1.0 / q_;
}
}
return 1.0;
}

// Weighted Biase Operator
// Weighted Bias Operator
template <typename W = weight_t>
__device__ std::enable_if_t<!std::is_same_v<W, void>, bias_t> operator()(
thrust::tuple<vertex_t, vertex_t> tagged_src,
Expand All @@ -156,24 +158,27 @@ struct node2vec_random_walk_e_bias_op_t {
return 1.0 / p_;
}
// Search zipped vertices for tagged src
auto lower_itr = thrust::lower_bound(
if(intersection_offsets_.size() > 0){
auto lower_itr = thrust::lower_bound(
thrust::seq,
thrust::make_zip_iterator(current_vertices_.begin(), prev_vertices_.begin()),
thrust::make_zip_iterator(current_vertices_.end(), prev_vertices_.end()),
tagged_src);
auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(),
prev_vertices_.begin()),
lower_itr);
auto start_idx = intersection_offsets_[low_idx];
auto end_idx = intersection_offsets_[low_idx + 1];
auto itr = thrust::lower_bound(
thrust::seq,
intersection_indices_.begin() + start_idx,
intersection_indices_.begin() + end_idx,
dst);
// dst not in intersection
if(itr == intersection_indices_.begin() + end_idx){
return 1.0 / q_;
auto low_idx = thrust::distance(thrust::make_zip_iterator(current_vertices_.begin(),
prev_vertices_.begin()),
lower_itr);
if(low_idx >= intersection_offsets_.size() - 1){return 1.0;}
auto start_idx = intersection_offsets_[low_idx];
auto end_idx = intersection_offsets_[low_idx + 1];
auto itr = thrust::lower_bound(
thrust::seq,
intersection_indices_.begin() + start_idx,
intersection_indices_.begin() + end_idx,
dst);
// dst not in intersection
if(itr == intersection_indices_.begin() + end_idx){
return 1.0 / q_;
}
}
return 1.0;
}
Expand Down Expand Up @@ -351,12 +356,12 @@ struct node2vec_selector {
std::optional<rmm::device_uvector<typename GraphViewType::vertex_type>>& previous_vertices)
{
// To do node2vec, I need the following:
// 1) transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v to compute the sum of the
// node2vec style weights
// 2) Generate a random number between [0, output_from_trdnioeebv[v])
// 3) a sampling value that lets me pick the correct edge based on the same computation
// (essentially weighted sampling, but with a function that computes the weight rather
// than just using the edge weights)
// 1) Find nbr_intersection() CSR
// 2) Convert CSR into COO format, with (p,c,d) triples
// 3) Shuffle COO structure s.t. each entry is on GPU where (c,d) resides
// 4) Revert COO back to CSR
// After this, existing functor should work for symmetric graphs. Add CUGRAPH_EXPECTS to ensure
// no asymmetric graphs and add FIXME

// Create vertex frontier
using vertex_t = typename GraphViewType::vertex_type;
Expand All @@ -380,7 +385,103 @@ struct node2vec_selector {
intersection_pairs + current_vertices.size(),
std::array<bool, 2>{true, true},
false);

if constexpr (GraphViewType::is_multi_gpu) {
// Create COO data structures
rmm::device_uvector<vertex_t> current_COO_buffer(intersection_indices.size(),
handle.get_stream());
rmm::device_uvector<vertex_t> destination_COO_buffer(intersection_indices.size(),
handle.get_stream());
auto intersection_COO = thrust::make_zip_iterator(current_COO_buffer.begin(),
destination_COO_buffer.begin());

// Fill COO data structure
thrust::for_each(
handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(intersection_indices.size()),
[intersection_COO,
current_vertices = raft::device_span<vertex_t>(current_vertices.data(), current_vertices.size()),
intersection_offsets = raft::device_span<size_t>(intersection_offsets.data(), intersection_offsets.size()),
intersection_indices = raft::device_span<vertex_t>(intersection_indices.data(), intersection_indices.size())]
__device__ (auto i) {
auto itr = thrust::upper_bound(thrust::seq,
intersection_offsets.begin(),
intersection_offsets.end(),
i);
auto idx = thrust::distance(intersection_offsets.begin(), itr);
intersection_COO[i] = thrust::make_tuple(current_vertices[idx - 1],
intersection_indices[i]);
});

// Create device partition range
rmm::device_uvector<vertex_t> d_vertex_partition_range_lasts(graph_view.vertex_partition_range_lasts().size(),
handle.get_stream());
raft::update_device(d_vertex_partition_range_lasts.data(),
graph_view.vertex_partition_range_lasts().data(),
graph_view.vertex_partition_range_lasts().size(),
handle.get_stream());

// Shuffle by current-destination edge endpoints
std::forward_as_tuple(std::tie(current_COO_buffer, destination_COO_buffer), std::ignore) =
cugraph::groupby_gpu_id_and_shuffle_values(
handle.get_comms(),
intersection_COO,
intersection_COO + intersection_indices.size(),
[key_func = cugraph::detail::compute_gpu_id_from_int_edge_endpoints_t<vertex_t>{
raft::device_span<vertex_t const>(d_vertex_partition_range_lasts.data(), d_vertex_partition_range_lasts.size()),
handle.get_comms().get_size(),
handle.get_subcomm(cugraph::partition_manager::major_comm_name()).get_size(),
handle.get_subcomm(cugraph::partition_manager::minor_comm_name()).get_size()}] __device__ (auto tuple) {
return key_func(thrust::get<0>(tuple), thrust::get<1>(tuple));
},
handle.get_stream());

// Sort post-shuffle for CSR conversion
thrust::sort(handle.get_thrust_policy(),
thrust::make_zip_iterator(current_COO_buffer.begin(),
destination_COO_buffer.begin()),
thrust::make_zip_iterator(current_COO_buffer.end(),
destination_COO_buffer.end()));

if(current_COO_buffer.size() > 0){
// Resize intersection data structures
intersection_offsets.resize(current_COO_buffer.back_element(handle.get_stream()) + 2, handle.get_stream());
intersection_offsets.shrink_to_fit(handle.get_stream());
intersection_indices.resize(destination_COO_buffer.size(), handle.get_stream());
intersection_indices.shrink_to_fit(handle.get_stream());
intersection_offsets.set_element_to_zero_async(intersection_offsets.size() - 1, handle.get_stream());

// Count offset sizes !!!FIXME: this does not convert correctly!!!
thrust::for_each(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(intersection_offsets.size() - 1),
[current_COO_buffer = raft::device_span<vertex_t>(current_COO_buffer.data(), current_COO_buffer.size()),
intersection_offsets = intersection_offsets.data()] __device__ (auto i){
auto count = thrust::count(thrust::seq,
current_COO_buffer.begin(),
current_COO_buffer.end(),
i);
intersection_offsets[i] = count;
});
} else {
intersection_offsets.resize(0, handle.get_stream());
intersection_offsets.shrink_to_fit(handle.get_stream());
intersection_indices.resize(0, handle.get_stream());
intersection_indices.shrink_to_fit(handle.get_stream());
}
// Scan to fill offsets
thrust::exclusive_scan(handle.get_thrust_policy(),
intersection_offsets.begin(),
intersection_offsets.end(),
intersection_offsets.begin());

// Copy indices
thrust::copy(handle.get_thrust_policy(),
destination_COO_buffer.begin(),
destination_COO_buffer.end(),
intersection_indices.data());
}
// Create data structs for results
rmm::device_uvector<vertex_t> minors(0, handle.get_stream());
std::optional<rmm::device_uvector<weight_t>> weights{std::nullopt};
Expand Down Expand Up @@ -442,11 +543,10 @@ struct node2vec_selector {
}

// Copy current vertices to previous vertices for two-order walk
thrust::copy(
handle.get_thrust_policy(),
current_vertices.begin(),
current_vertices.end(),
(*previous_vertices).data());
thrust::copy(handle.get_thrust_policy(),
current_vertices.begin(),
current_vertices.end(),
(*previous_vertices).data());

return std::make_tuple(std::move(minors), std::move(weights));
}
Expand Down Expand Up @@ -732,11 +832,20 @@ random_walk_impl(raft::handle_t const& handle,

// Moved out of if statements to cut down on code duplication
current_vertices.resize(compacted_length, handle.get_stream());
current_vertices.shrink_to_fit(handle.get_stream());
current_position.resize(compacted_length, handle.get_stream());
if (result_weights) {new_weights->resize(compacted_length, handle.get_stream());}
if (previous_vertices) {previous_vertices->resize(compacted_length, handle.get_stream());}
current_position.shrink_to_fit(handle.get_stream());
if (result_weights) {
new_weights->resize(compacted_length, handle.get_stream());
new_weights->shrink_to_fit(handle.get_stream());
}
if (previous_vertices) {
previous_vertices->resize(compacted_length, handle.get_stream());
previous_vertices->shrink_to_fit(handle.get_stream());
}
if constexpr (multi_gpu) {
current_gpu.resize(compacted_length, handle.get_stream());
current_gpu.shrink_to_fit(handle.get_stream());

// Shuffle back to original GPU
if (previous_vertices) {
Expand Down

0 comments on commit a7c6a22

Please sign in to comment.