Skip to content

Commit

Permalink
MTMG gather use of streams (#4237)
Browse files Browse the repository at this point in the history
`mtmg::vertex_result_t::gather` was causing intermittent flaky results.

A couple of issues were identified:
* Stream use was incorrect.  We were using a stream from the stream pool AND the default stream.  Modified to use the default stream throughout to resolve the issue.  Created #4236 to design a long-term solution.
* Customer use case was trying to gather results from vertices that didn't exist.  Added a mechanism to allow for that and return a default value for vertices that don't exist.

Closes rapidsai/graph_dl#464

Authors:
  - Chuck Hastings (https://github.com/ChuckHastings)

Approvers:
  - Seunghwa Kang (https://github.com/seunghwak)
  - Naim (https://github.com/naimnv)

URL: #4237
  • Loading branch information
ChuckHastings authored Mar 13, 2024
1 parent 3086f83 commit 0981a8d
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 79 deletions.
17 changes: 16 additions & 1 deletion cpp/include/cugraph/mtmg/instance_manager.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,6 +73,21 @@ class instance_manager_t {
return handle_t(*raft_handle_[gpu_id], thread_id, device_ids_[gpu_id]);
}

/**
* @brief Get handle for particular GPU
*
* Return a handle for a particular GPU. In a context-free environment
* this lets the caller reconstitute the handle for the right host thread.
* It does assume that the caller will not allow multiple threads to
* concurrently use a gpu_id/thread_id pair.
*
* @return a handle for this thread.
*/
handle_t get_handle(int gpu_id, int thread_id = 0)
{
return handle_t(*raft_handle_[gpu_id], thread_id, device_ids_[gpu_id]);
}

/**
* @brief Reset the thread counter
*
Expand Down
5 changes: 3 additions & 2 deletions cpp/include/cugraph/mtmg/vertex_result_view.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,7 +45,8 @@ class vertex_result_view_t : public detail::device_shared_device_span_t<result_t
raft::device_span<vertex_t const> vertices,
std::vector<vertex_t> const& vertex_partition_range_lasts,
cugraph::vertex_partition_view_t<vertex_t, multi_gpu> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<vertex_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<vertex_t>>& renumber_map_view,
result_t default_value = 0);
};

} // namespace mtmg
Expand Down
182 changes: 106 additions & 76 deletions cpp/src/mtmg/vertex_result.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cugraph/detail/utility_wrappers.hpp>
#include <cugraph/graph_functions.hpp>
#include <cugraph/mtmg/vertex_result_view.hpp>
#include <cugraph/utilities/device_functors.cuh>
#include <cugraph/vertex_partition_device_view.cuh>

#include <thrust/functional.h>
Expand All @@ -34,58 +35,71 @@ rmm::device_uvector<result_t> vertex_result_view_t<result_t>::gather(
raft::device_span<vertex_t const> vertices,
std::vector<vertex_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<vertex_t, multi_gpu> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<vertex_t>>& renumber_map_view)
std::optional<cugraph::mtmg::renumber_map_view_t<vertex_t>>& renumber_map_view,
result_t default_value)
{
rmm::device_uvector<vertex_t> local_vertices(vertices.size(), handle.get_stream());
rmm::device_uvector<int> vertex_gpu_ids(vertices.size(), handle.get_stream());
rmm::device_uvector<size_t> vertex_pos(vertices.size(), handle.get_stream());
rmm::device_uvector<result_t> result(vertices.size(), handle.get_stream());

raft::copy(local_vertices.data(), vertices.data(), vertices.size(), handle.get_stream());
cugraph::detail::scalar_fill(
handle.get_stream(), vertex_gpu_ids.data(), vertex_gpu_ids.size(), handle.get_rank());
cugraph::detail::sequence_fill(
handle.get_stream(), vertex_pos.data(), vertex_pos.size(), size_t{0});

rmm::device_uvector<vertex_t> d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(),
handle.get_stream());
raft::update_device(d_vertex_partition_range_lasts.data(),
vertex_partition_range_lasts.data(),
vertex_partition_range_lasts.size(),
handle.get_stream());
auto stream = handle.raft_handle().get_stream();

rmm::device_uvector<vertex_t> local_vertices(vertices.size(), stream);
rmm::device_uvector<int> vertex_gpu_ids(multi_gpu ? vertices.size() : 0, stream);
rmm::device_uvector<size_t> vertex_pos(multi_gpu ? vertices.size() : 0, stream);

raft::copy(local_vertices.data(), vertices.data(), vertices.size(), stream);

if constexpr (multi_gpu) {
cugraph::detail::scalar_fill(
stream, vertex_gpu_ids.data(), vertex_gpu_ids.size(), handle.get_rank());
cugraph::detail::sequence_fill(stream, vertex_pos.data(), vertex_pos.size(), size_t{0});

auto const comm_size = handle.raft_handle().get_comms().get_size();
auto const major_comm_size =
handle.raft_handle().get_subcomm(cugraph::partition_manager::major_comm_name()).get_size();
auto const minor_comm_size =
handle.raft_handle().get_subcomm(cugraph::partition_manager::minor_comm_name()).get_size();

std::forward_as_tuple(local_vertices, std::tie(vertex_gpu_ids, vertex_pos), std::ignore) =
groupby_gpu_id_and_shuffle_kv_pairs(
handle.raft_handle().get_comms(),
local_vertices.begin(),
local_vertices.end(),
thrust::make_zip_iterator(vertex_gpu_ids.begin(), vertex_pos.begin()),
cugraph::detail::compute_gpu_id_from_ext_vertex_t<vertex_t>{
comm_size, major_comm_size, minor_comm_size},
stream);
}

if (renumber_map_view) {
cugraph::renumber_ext_vertices<vertex_t, multi_gpu>(
cugraph::renumber_local_ext_vertices<vertex_t, multi_gpu>(
handle.raft_handle(),
local_vertices.data(),
local_vertices.size(),
renumber_map_view->get(handle).data(),
vertex_partition_view.local_vertex_partition_range_first(),
vertex_partition_view.local_vertex_partition_range_last());
}

auto const major_comm_size =
handle.raft_handle().get_subcomm(cugraph::partition_manager::major_comm_name()).get_size();
auto const minor_comm_size =
handle.raft_handle().get_subcomm(cugraph::partition_manager::minor_comm_name()).get_size();

std::forward_as_tuple(local_vertices, std::tie(vertex_gpu_ids, vertex_pos), std::ignore) =
groupby_gpu_id_and_shuffle_kv_pairs(
handle.raft_handle().get_comms(),
local_vertices.begin(),
local_vertices.end(),
thrust::make_zip_iterator(vertex_gpu_ids.begin(), vertex_pos.begin()),
cugraph::detail::compute_gpu_id_from_int_vertex_t<vertex_t>{
raft::device_span<vertex_t const>(d_vertex_partition_range_lasts.data(),
d_vertex_partition_range_lasts.size()),
major_comm_size,
minor_comm_size},
handle.get_stream());
size_t new_size = thrust::distance(
thrust::make_zip_iterator(local_vertices.begin(), vertex_gpu_ids.begin(), vertex_pos.begin()),
thrust::remove_if(
rmm::exec_policy(stream),
thrust::make_zip_iterator(
local_vertices.begin(), vertex_gpu_ids.begin(), vertex_pos.begin()),
thrust::make_zip_iterator(local_vertices.end(), vertex_gpu_ids.end(), vertex_pos.end()),
[check = cugraph::detail::check_out_of_range_t<vertex_t>{
vertex_partition_view.local_vertex_partition_range_first(),
vertex_partition_view.local_vertex_partition_range_last()}] __device__(auto tuple) {
return check(thrust::get<0>(tuple));
}));

local_vertices.resize(new_size, stream);
vertex_gpu_ids.resize(new_size, stream);
vertex_pos.resize(new_size, stream);
}

//
// Now gather
//
rmm::device_uvector<result_t> tmp_result(local_vertices.size(), handle.get_stream());
rmm::device_uvector<result_t> result(local_vertices.size(), stream);
cugraph::detail::scalar_fill(stream, result.data(), result.size(), default_value);

auto& wrapped = this->get(handle);

Expand All @@ -98,32 +112,36 @@ rmm::device_uvector<result_t> vertex_result_view_t<result_t>::gather(
return vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v);
}));

thrust::gather(handle.get_thrust_policy(),
iter,
iter + local_vertices.size(),
wrapped.begin(),
tmp_result.begin());

//
// Shuffle back
//
std::forward_as_tuple(std::ignore, std::tie(std::ignore, vertex_pos, tmp_result), std::ignore) =
groupby_gpu_id_and_shuffle_kv_pairs(
handle.raft_handle().get_comms(),
vertex_gpu_ids.begin(),
vertex_gpu_ids.end(),
thrust::make_zip_iterator(local_vertices.begin(), vertex_pos.begin(), tmp_result.begin()),
thrust::identity{},
handle.get_stream());

//
// Finally, reorder result
//
thrust::scatter(handle.get_thrust_policy(),
tmp_result.begin(),
tmp_result.end(),
vertex_pos.begin(),
result.begin());
thrust::gather(
rmm::exec_policy(stream), iter, iter + local_vertices.size(), wrapped.begin(), result.begin());

if constexpr (multi_gpu) {
rmm::device_uvector<result_t> tmp_result(0, stream);

//
// Shuffle back
//
std::forward_as_tuple(std::ignore, std::tie(std::ignore, vertex_pos, tmp_result), std::ignore) =
groupby_gpu_id_and_shuffle_kv_pairs(
handle.raft_handle().get_comms(),
vertex_gpu_ids.begin(),
vertex_gpu_ids.end(),
thrust::make_zip_iterator(local_vertices.begin(), vertex_pos.begin(), result.begin()),
thrust::identity{},
stream);

//
// Finally, reorder result
//
result.resize(tmp_result.size(), stream);
cugraph::detail::scalar_fill(stream, result.data(), result.size(), default_value);

thrust::scatter(rmm::exec_policy(stream),
tmp_result.begin(),
tmp_result.end(),
vertex_pos.begin(),
result.begin());
}

return result;
}
Expand All @@ -133,84 +151,96 @@ template rmm::device_uvector<float> vertex_result_view_t<float>::gather(
raft::device_span<int32_t const> vertices,
std::vector<int32_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int32_t, false> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view,
float default_value);

template rmm::device_uvector<float> vertex_result_view_t<float>::gather(
handle_t const& handle,
raft::device_span<int64_t const> vertices,
std::vector<int64_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int64_t, false> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view,
float default_value);

template rmm::device_uvector<float> vertex_result_view_t<float>::gather(
handle_t const& handle,
raft::device_span<int32_t const> vertices,
std::vector<int32_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int32_t, true> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view,
float default_value);

template rmm::device_uvector<float> vertex_result_view_t<float>::gather(
handle_t const& handle,
raft::device_span<int64_t const> vertices,
std::vector<int64_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int64_t, true> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view,
float default_value);

template rmm::device_uvector<double> vertex_result_view_t<double>::gather(
handle_t const& handle,
raft::device_span<int32_t const> vertices,
std::vector<int32_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int32_t, false> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view,
double default_value);

template rmm::device_uvector<double> vertex_result_view_t<double>::gather(
handle_t const& handle,
raft::device_span<int64_t const> vertices,
std::vector<int64_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int64_t, false> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view,
double default_value);

template rmm::device_uvector<double> vertex_result_view_t<double>::gather(
handle_t const& handle,
raft::device_span<int32_t const> vertices,
std::vector<int32_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int32_t, true> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view,
double default_value);

template rmm::device_uvector<double> vertex_result_view_t<double>::gather(
handle_t const& handle,
raft::device_span<int64_t const> vertices,
std::vector<int64_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int64_t, true> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view,
double default_value);

template rmm::device_uvector<int32_t> vertex_result_view_t<int32_t>::gather(
handle_t const& handle,
raft::device_span<int32_t const> vertices,
std::vector<int32_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int32_t, false> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view,
int32_t default_value);

template rmm::device_uvector<int32_t> vertex_result_view_t<int32_t>::gather(
handle_t const& handle,
raft::device_span<int32_t const> vertices,
std::vector<int32_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int32_t, true> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int32_t>>& renumber_map_view,
int32_t default_value);

template rmm::device_uvector<int64_t> vertex_result_view_t<int64_t>::gather(
handle_t const& handle,
raft::device_span<int64_t const> vertices,
std::vector<int64_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int64_t, false> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view,
int64_t default_value);

template rmm::device_uvector<int64_t> vertex_result_view_t<int64_t>::gather(
handle_t const& handle,
raft::device_span<int64_t const> vertices,
std::vector<int64_t> const& vertex_partition_range_lasts,
vertex_partition_view_t<int64_t, true> vertex_partition_view,
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view);
std::optional<cugraph::mtmg::renumber_map_view_t<int64_t>>& renumber_map_view,
int64_t default_value);

} // namespace mtmg
} // namespace cugraph

0 comments on commit 0981a8d

Please sign in to comment.